diff options
| author | Ben Hood <0x6e6562@gmail.com> | 2009-01-21 13:17:21 +0000 |
|---|---|---|
| committer | Ben Hood <0x6e6562@gmail.com> | 2009-01-21 13:17:21 +0000 |
| commit | adc88315b04e35123a69bcd30dafa5be70fc8787 (patch) | |
| tree | deb0da0c09c9f380e915cdbb596477ab02af4f17 | |
| parent | 9604a10ae3fc86bc84ffdbbf017d8ad6081e6796 (diff) | |
| parent | 51b19aaff21998a54870db8ba893ee10a9531779 (diff) | |
| download | rabbitmq-server-git-adc88315b04e35123a69bcd30dafa5be70fc8787.tar.gz | |
Merged default into 20097
| -rw-r--r-- | .hgignore | 1 | ||||
| -rw-r--r-- | Makefile | 21 | ||||
| -rw-r--r-- | README.in (renamed from BUILD.in) | 0 | ||||
| -rw-r--r-- | docs/rabbitmqctl.1.pod | 20 | ||||
| -rw-r--r-- | ebin/rabbit.app | 58 | ||||
| -rw-r--r-- | ebin/rabbit_app.in | 20 | ||||
| -rw-r--r-- | generate_app | 10 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/Makefile | 23 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/init.d | 4 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 47 | ||||
| -rw-r--r-- | packaging/debs/Debian/debian/changelog | 6 | ||||
| -rw-r--r-- | packaging/debs/Debian/debian/init.d | 1 | ||||
| -rw-r--r-- | packaging/debs/Debian/debian/postinst | 4 | ||||
| -rw-r--r-- | packaging/debs/Debian/debian/watch | 4 | ||||
| -rw-r--r-- | packaging/windows/Makefile | 2 | ||||
| -rw-r--r-- | src/buffering_proxy.erl | 108 | ||||
| -rw-r--r-- | src/rabbit_access_control.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 187 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 135 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_multi.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 2 |
25 files changed, 353 insertions, 367 deletions
@@ -9,6 +9,7 @@ syntax: regexp ^include/rabbit_framing.hrl$ ^src/rabbit_framing.erl$ ^rabbit.plt$ +^ebin/rabbit.app$ ^packaging/RPMS/Fedora/(BUILD|RPMS|SOURCES|SPECS|SRPMS)$ ^packaging/debs/Debian/rabbitmq-server_.*\.(dsc|(diff|tar)\.gz|deb|changes)$ @@ -7,7 +7,8 @@ SOURCE_DIR=src EBIN_DIR=ebin INCLUDE_DIR=include SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) -TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES)) +BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES)) +TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS) WEB_URL=http://stage.rabbitmq.com/ MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod)) @@ -39,6 +40,9 @@ ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e #all: $(EBIN_DIR)/rabbit.boot all: $(TARGETS) +$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app + escript generate_app $(EBIN_DIR) < $< > $@ + $(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl erlc $(ERLC_OPTS) $< @@ -47,20 +51,20 @@ $(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCL # ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) - $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) > $@ + $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@ $(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) - $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) > $@ + $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@ $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script: $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.rel $(TARGETS) erl -noshell -eval 'systools:make_script("ebin/rabbit", [{path, ["ebin"]}]), halt().' -dialyze: $(TARGETS) +dialyze: $(BEAM_TARGETS) dialyzer -c $? clean: cleandb rm -f $(EBIN_DIR)/*.beam - rm -f $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script + rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc rm -f docs/*.[0-9].gz @@ -123,10 +127,15 @@ srcdist: distclean cp INSTALL.in $(TARGET_SRC_DIR)/INSTALL elinks -dump -no-references -no-numbering $(WEB_URL)install.html \ >> $(TARGET_SRC_DIR)/INSTALL - cp BUILD.in $(TARGET_SRC_DIR)/BUILD + cp README.in $(TARGET_SRC_DIR)/README elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \ +<<<<<<< local >> $(TARGET_SRC_DIR)/BUILD + sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in +======= + >> $(TARGET_SRC_DIR)/README sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit.app +>>>>>>> other cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ cp codegen.py Makefile $(TARGET_SRC_DIR) diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index b9edd5847f..fd8918cd9b 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -146,23 +146,27 @@ auto_delete arguments queue arguments -pid - Erlang process identifier associated with the queue +node + node on which the process associated with the queue resides messages_ready - number of ready messages + number of messages ready to be delivered to clients messages_unacknowledged - number of unacknowledged messages + number of messages delivered to clients but not yet acknowledged messages_uncommitted - number of uncommitted messages + number of messages published in as yet uncommitted transactions +<<<<<<< local +======= messages sum of ready, unacknowledged and uncommitted messages +>>>>>>> other acks_uncommitted - number of uncommitted acknowledgements + number of acknowledgements received in as yet uncommitted + transactions consumers number of consumers @@ -214,8 +218,8 @@ list_connections [I<connectioninfoitem> ...] =over 4 -pid - Erlang process id associated with the connection +node + node on which the process associated with the connection resides address server IP number diff --git a/ebin/rabbit.app b/ebin/rabbit.app deleted file mode 100644 index 64afb5ff72..0000000000 --- a/ebin/rabbit.app +++ /dev/null @@ -1,58 +0,0 @@ -{application, rabbit, %% -*- erlang -*- - [{description, "RabbitMQ"}, - {id, "RabbitMQ"}, - {vsn, "%%VERSION%%"}, - {modules, [buffering_proxy, - rabbit_access_control, - rabbit_alarm, - rabbit_amqqueue, - rabbit_amqqueue_process, - rabbit_amqqueue_sup, - rabbit_binary_generator, - rabbit_binary_parser, - rabbit_channel, - rabbit_control, - rabbit, - rabbit_error_logger, - rabbit_error_logger_file_h, - rabbit_exchange, - rabbit_framing_channel, - rabbit_framing, - rabbit_heartbeat, - rabbit_limiter, - rabbit_load, - rabbit_log, - rabbit_memsup_linux, - rabbit_misc, - rabbit_mnesia, - rabbit_multi, - rabbit_networking, - rabbit_node_monitor, - rabbit_persister, - rabbit_reader, - rabbit_router, - rabbit_sasl_report_file_h, - rabbit_sup, - rabbit_tests, - rabbit_tracer, - rabbit_writer, - tcp_acceptor, - tcp_acceptor_sup, - tcp_client_sup, - tcp_listener, - tcp_listener_sup]}, - {registered, [rabbit_amqqueue_sup, - rabbit_log, - rabbit_node_monitor, - rabbit_persister, - rabbit_router, - rabbit_sup, - rabbit_tcp_client_sup]}, - {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, - {mod, {rabbit, []}}, - {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, - {extra_startup_steps, []}, - {default_user, <<"guest">>}, - {default_pass, <<"guest">>}, - {default_vhost, <<"/">>}, - {memory_alarms, auto}]}]}. diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in new file mode 100644 index 0000000000..e2f36c0f5f --- /dev/null +++ b/ebin/rabbit_app.in @@ -0,0 +1,20 @@ +{application, rabbit, %% -*- erlang -*- + [{description, "RabbitMQ"}, + {id, "RabbitMQ"}, + {vsn, "%%VERSION%%"}, + {modules, []}, + {registered, [rabbit_amqqueue_sup, + rabbit_log, + rabbit_node_monitor, + rabbit_persister, + rabbit_router, + rabbit_sup, + rabbit_tcp_client_sup]}, + {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, + {mod, {rabbit, []}}, + {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, + {extra_startup_steps, []}, + {default_user, <<"guest">>}, + {default_pass, <<"guest">>}, + {default_vhost, <<"/">>}, + {memory_alarms, auto}]}]}. diff --git a/generate_app b/generate_app new file mode 100644 index 0000000000..623012927e --- /dev/null +++ b/generate_app @@ -0,0 +1,10 @@ +#!/usr/bin/env escript +%% -*- erlang -*- + +main([BeamDir]) -> + Modules = [list_to_atom(filename:basename(F, ".beam")) || + F <- filelib:wildcard("*.beam", BeamDir)], + {ok, {application, Application, Properties}} = io:read(''), + NewProperties = lists:keyreplace(modules, 1, Properties, + {modules, Modules}), + io:format("~p.", [{application, Application, NewProperties}]). diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index c05f14a7cb..cf3a93dfa1 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -6,21 +6,38 @@ TOP_DIR=$(shell pwd) #only checks build-dependencies using rpms, not debs DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'debian 1' +ifndef RPM_OS +RPM_OS=fedora +endif + +ifeq "x$(RPM_OS)" "xsuse" +REQUIRES=/sbin/chkconfig /sbin/service +OS_DEFINES=--define '_initrddir /etc/init.d' +RELEASE_OS=.suse +else +REQUIRES=chkconfig initscripts +OS_DEFINES=--define '_initrddir /etc/rc.d/init.d' +RELEASE_OS= +endif + rpms: clean server prepare: mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp cp $(TOP_DIR)/$(TARBALL) SOURCES cp rabbitmq-server.spec SPECS - sed -i 's/%%VERSION%%/$(VERSION)/' SPECS/rabbitmq-server.spec + sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|;s|%%RELEASE_OS%%|$(RELEASE_OS)|' \ + SPECS/rabbitmq-server.spec cp init.d SOURCES/rabbitmq-server.init cp rabbitmqctl_wrapper SOURCES/rabbitmq-server.wrapper cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate server: prepare - rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --target i386 - rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --define '_arch x86_64' \ + rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \ + --target i386 + rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \ + --define '_libdir /usr/lib64' --define '_arch x86_64' \ --define '_defaultdocdir /usr/share/doc' --target x86_64 clean: diff --git a/packaging/RPMS/Fedora/init.d b/packaging/RPMS/Fedora/init.d index 27f150f997..a006a5a7a2 100644 --- a/packaging/RPMS/Fedora/init.d +++ b/packaging/RPMS/Fedora/init.d @@ -16,7 +16,6 @@ # Short-Description: Enable AMQP service provided by RabbitMQ broker ### END INIT INFO -PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin DAEMON_NAME=rabbitmq-multi DAEMON=/usr/lib/rabbitmq/bin/$DAEMON_NAME NAME=rabbitmq-server @@ -29,9 +28,6 @@ LOCK_FILE=/var/lock/subsys/$NAME test -x $DAEMON || exit 0 -# source function library -. /etc/rc.d/init.d/functions - # Include rabbitmq defaults if available if [ -f /etc/default/rabbitmq ] ; then . /etc/default/rabbitmq diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 13cfb0372e..fc109bdbcc 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -1,6 +1,6 @@ Name: rabbitmq-server Version: %%VERSION%% -Release: 1 +Release: 1%%RELEASE_OS%% License: MPLv1.1 Group: Development/Libraries Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.gz @@ -17,24 +17,18 @@ Requires: erlang, logrotate Packager: Hubert Plociniczak <hubert@lshift.net> BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root Summary: The RabbitMQ server -Requires(post): chkconfig -Requires(pre): chkconfig initscripts +Requires(post): %%REQUIRES%% +Requires(pre): %%REQUIRES%% %description RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and scalable implementation of an AMQP broker. -%ifarch x86_64 - %define _defaultlibdir /usr/lib64 -%else - %define _defaultlibdir /usr/lib -%endif - -%define _erllibdir %{_defaultlibdir}/erlang/lib -%define _rabbitbindir %{_defaultlibdir}/rabbitmq/bin +%define _rabbit_erllibdir %{_libdir}/erlang/lib/rabbitmq_server-%{version} +%define _rabbit_libdir %{_libdir}/rabbitmq -%define _maindir %{buildroot}%{_erllibdir}/rabbitmq_server-%{version} +%define _maindir %{buildroot}%{_rabbit_erllibdir} %pre if [ $1 -gt 1 ]; then @@ -53,25 +47,21 @@ make rm -rf %{buildroot} make install TARGET_DIR=%{_maindir} \ - SBIN_DIR=%{buildroot}%{_rabbitbindir} \ + SBIN_DIR=%{buildroot}%{_rabbit_libdir}/bin \ MAN_DIR=%{buildroot}%{_mandir} mkdir -p %{buildroot}/var/lib/rabbitmq/mnesia mkdir -p %{buildroot}/var/log/rabbitmq -mkdir -p %{buildroot}/etc/rc.d/init.d/ +mkdir -p %{buildroot}%{_initrddir} #Copy all necessary lib files etc. -install -m 0755 %SOURCE1 %{buildroot}/etc/rc.d/init.d/rabbitmq-server -chmod 0755 %{buildroot}/etc/rc.d/init.d/rabbitmq-server -%ifarch x86_64 - sed -i 's/\/usr\/lib\//\/usr\/lib64\//' %{buildroot}/etc/rc.d/init.d/rabbitmq-server -%endif +install -m 0755 %SOURCE1 %{buildroot}%{_initrddir}/rabbitmq-server +chmod 0755 %{buildroot}%{_initrddir}/rabbitmq-server +sed -i 's|/usr/lib/|%{_libdir}/|' %{buildroot}%{_initrddir}/rabbitmq-server mkdir -p %{buildroot}%{_sbindir} install -m 0755 %SOURCE2 %{buildroot}%{_sbindir}/rabbitmqctl -%ifarch x86_64 - sed -i 's/\/usr\/lib\//\/usr\/lib64\//' %{buildroot}%{_sbindir}/rabbitmqctl -%endif +sed -i 's|/usr/lib/|%{_libdir}/|' %{buildroot}%{_sbindir}/rabbitmqctl mkdir -p %{buildroot}/etc/logrotate.d install -m 0644 %SOURCE3 %{buildroot}/etc/logrotate.d/rabbitmq-server @@ -81,8 +71,10 @@ rm %{_maindir}/LICENSE %{_maindir}/LICENSE-MPL-RabbitMQ %{_maindir}/INSTALL #Build the list of files rm -f %{_builddir}/filelist.%{name}.rpm echo '%defattr(-,root,root, -)' >> %{_builddir}/filelist.%{name}.rpm -(cd %{buildroot}; find . ! -regex '\./etc.*' \ - -type f | sed -e 's/^\.//' >> %{_builddir}/filelist.%{name}.rpm) +(cd %{buildroot}; \ + find . -type f ! -regex '\./etc.*' \ + ! -regex '\.\(%{_rabbit_erllibdir}\|%{_rabbit_libdir}\).*' \ + | sed -e 's/^\.//' >> %{_builddir}/filelist.%{name}.rpm) %post # create rabbitmq group @@ -116,7 +108,9 @@ fi %defattr(-,root,root,-) %dir /var/lib/rabbitmq %dir /var/log/rabbitmq -/etc/rc.d/init.d/rabbitmq-server +%{_rabbit_erllibdir} +%{_rabbit_libdir} +%{_initrddir}/rabbitmq-server %config(noreplace) /etc/logrotate.d/rabbitmq-server %doc LICENSE LICENSE-MPL-RabbitMQ INSTALL @@ -124,6 +118,9 @@ fi rm -rf %{buildroot} %changelog +* Mon Jan 19 2009 Ben Hood <0x6e6562@gmail.com> 1.5.1-1 +- Maintenance release for the 1.5.x series + * Wed Dec 17 2008 Matthias Radestock <matthias@lshift.net> 1.5.0-1 - New upstream release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index e8be8d8d8c..37b01dabb8 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (1.5.1-1) hardy; urgency=low + + * New Upstream Release + + -- Simon MacMullen <simon@lshift.net> Mon, 19 Jan 2009 15:46:13 +0000 + rabbitmq-server (1.5.0-1) testing; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d index ace474c59f..70dd0adf5f 100644 --- a/packaging/debs/Debian/debian/init.d +++ b/packaging/debs/Debian/debian/init.d @@ -9,7 +9,6 @@ # Short-Description: Enable AMQP service provided by RabbitMQ broker ### END INIT INFO -PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin DAEMON=/usr/lib/rabbitmq/bin/rabbitmq-multi NAME=rabbitmq-server DESC=rabbitmq-server diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst index 495b8331f0..05fb179cbf 100644 --- a/packaging/debs/Debian/debian/postinst +++ b/packaging/debs/Debian/debian/postinst @@ -25,8 +25,8 @@ fi # create rabbitmq user if ! getent passwd rabbitmq >/dev/null; then - adduser --system --ingroup rabbitmq --home /var/lib/rabbitmq --no-create-home rabbitmq - usermod -c "RabbitMQ messaging server" rabbitmq + adduser --system --ingroup rabbitmq --home /var/lib/rabbitmq \ + --no-create-home --gecos "RabbitMQ messaging server" rabbitmq fi chown -R rabbitmq:rabbitmq /var/lib/rabbitmq diff --git a/packaging/debs/Debian/debian/watch b/packaging/debs/Debian/debian/watch new file mode 100644 index 0000000000..b41aff9aed --- /dev/null +++ b/packaging/debs/Debian/debian/watch @@ -0,0 +1,4 @@ +version=3 + +http://www.rabbitmq.com/releases/rabbitmq-server/v(.*)/rabbitmq-server-(\d.*)\.tar\.gz \ + debian uupdate diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index 9d16fd9fb3..59101cb2c6 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -15,7 +15,7 @@ dist: mv $(SOURCE_DIR)/scripts/rabbitmq-multi.bat $(SOURCE_DIR)/sbin rm -rf $(SOURCE_DIR)/scripts rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile - rm -f $(SOURCE_DIR)/BUILD + rm -f $(SOURCE_DIR)/README rm -rf $(SOURCE_DIR)/docs mv $(SOURCE_DIR) $(TARGET_DIR) diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl deleted file mode 100644 index 344b719a3c..0000000000 --- a/src/buffering_proxy.erl +++ /dev/null @@ -1,108 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(buffering_proxy). - --export([start_link/2]). - -%% internal - --export([mainloop/4, drain/2]). --export([proxy_loop/3]). - --define(HIBERNATE_AFTER, 5000). - -%%---------------------------------------------------------------------------- - -start_link(M, A) -> - spawn_link( - fun () -> process_flag(trap_exit, true), - ProxyPid = self(), - Ref = make_ref(), - Pid = spawn_link( - fun () -> ProxyPid ! Ref, - mainloop(ProxyPid, Ref, M, - M:init(ProxyPid, A)) end), - proxy_loop(Ref, Pid, empty) - end). - -%%---------------------------------------------------------------------------- - -mainloop(ProxyPid, Ref, M, State) -> - NewState = - receive - {Ref, Messages} -> - NewSt = - lists:foldl(fun (Msg, S) -> - drain(M, M:handle_message(Msg, S)) - end, State, lists:reverse(Messages)), - ProxyPid ! Ref, - NewSt; - Msg -> M:handle_message(Msg, State) - after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, mainloop, - [ProxyPid, Ref, M, State]) - end, - ?MODULE:mainloop(ProxyPid, Ref, M, NewState). - -drain(M, State) -> - receive - Msg -> ?MODULE:drain(M, M:handle_message(Msg, State)) - after 0 -> - State - end. - -proxy_loop(Ref, Pid, State) -> - receive - Ref -> - ?MODULE:proxy_loop( - Ref, Pid, - case State of - empty -> waiting; - waiting -> exit(duplicate_next); - Messages -> Pid ! {Ref, Messages}, empty - end); - {'EXIT', Pid, Reason} -> - exit(Reason); - {'EXIT', _, Reason} -> - exit(Pid, Reason), - ?MODULE:proxy_loop(Ref, Pid, State); - Msg -> - ?MODULE:proxy_loop( - Ref, Pid, - case State of - empty -> [Msg]; - waiting -> Pid ! {Ref, [Msg]}, empty; - Messages -> [Msg | Messages] - end) - after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, proxy_loop, [Ref, Pid, State]) - end. diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index b73090fc44..36270efddc 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -186,6 +186,8 @@ add_vhost(VHostPath) -> [{<<"">>, direct}, {<<"amq.direct">>, direct}, {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml {<<"amq.fanout">>, fanout}]], ok; [_] -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5d612fbb69..abbdce66d1 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -91,7 +91,7 @@ -spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). --spec(limit_all/3 :: ([pid()], pid(), pid()) -> ok_or_errors()). +-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). @@ -276,7 +276,7 @@ basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, - LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5199fb87b1..c390b2b7e4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --behaviour(gen_server). +-behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). -define(HIBERNATE_AFTER, 1000). @@ -86,7 +86,7 @@ %%---------------------------------------------------------------------------- start_link(Q) -> - gen_server:start_link(?MODULE, Q, []). + gen_server2:start_link(?MODULE, Q, []). %%---------------------------------------------------------------------------- @@ -502,7 +502,8 @@ i(name, #q{q = #amqqueue{name = Name}}) -> Name; i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable; i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete; i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments; -i(pid, #q{q = #amqqueue{pid = Pid}}) -> Pid; +i(pid, _) -> + self(); i(messages_ready, #q{message_buffer = MessageBuffer}) -> queue:len(MessageBuffer); i(messages_unacknowledged, _) -> @@ -513,8 +514,8 @@ i(messages_uncommitted, _) -> #tx{pending_messages = Pending} <- all_tx_record()]); i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, - messages_unacknowledged, - messages_uncommitted]]); + messages_unacknowledged, + messages_uncommitted]]); i(acks_uncommitted, _) -> lists:sum([length(Pending) || #tx{pending_acks = Pending} <- all_tx_record()]); @@ -565,14 +566,14 @@ handle_call({deliver, Txn, Message}, _From, State) -> handle_call({commit, Txn}, From, State) -> ok = commit_work(Txn, qname(State)), %% optimisation: we reply straight away so the sender can continue - gen_server:reply(From, ok), + gen_server2:reply(From, ok), NewState = process_pending(Txn, State), erase_tx(Txn), noreply(NewState); handle_call({notify_down, ChPid}, From, State) -> %% optimisation: we reply straight away so the sender can continue - gen_server:reply(From, ok), + gen_server2:reply(From, ok), handle_ch_down(ChPid, State); handle_call({basic_get, ChPid, NoAck}, _From, @@ -799,7 +800,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State) -> %% TODO: Once we drop support for R11B-5, we can change this to %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server, enter_loop, [?MODULE, [], State]); + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]); handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 513d305021..376e39c60d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -33,18 +33,21 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). +-behaviour(gen_server2). + -export([start_link/4, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). -%% callbacks --export([init/2, handle_message/2]). +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --record(ch, {state, proxy_pid, reader_pid, writer_pid, limiter_pid, +-record(ch, {state, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping}). +-define(HIBERNATE_AFTER, 1000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -62,109 +65,101 @@ %%---------------------------------------------------------------------------- start_link(ReaderPid, WriterPid, Username, VHost) -> - buffering_proxy:start_link(?MODULE, [ReaderPid, WriterPid, - Username, VHost]). + {ok, Pid} = gen_server2:start_link( + ?MODULE, [ReaderPid, WriterPid, Username, VHost], []), + Pid. do(Pid, Method) -> do(Pid, Method, none). do(Pid, Method, Content) -> - Pid ! {method, Method, Content}, - ok. + gen_server2:cast(Pid, {method, Method, Content}). shutdown(Pid) -> - Pid ! terminate, - ok. + gen_server2:cast(Pid, terminate). send_command(Pid, Msg) -> - Pid ! {command, Msg}, - ok. + gen_server2:cast(Pid, {command, Msg}). deliver(Pid, ConsumerTag, AckRequired, Msg) -> - Pid ! {deliver, ConsumerTag, AckRequired, Msg}, - ok. + gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). conserve_memory(Pid, Conserve) -> - Pid ! {conserve_memory, Conserve}, - ok. + gen_server2:cast(Pid, {conserve_memory, Conserve}). %%--------------------------------------------------------------------------- -init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> +init([ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), - %% this is bypassing the proxy so alarms can "jump the queue" and - %% be handled promptly rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - #ch{state = starting, - proxy_pid = ProxyPid, - reader_pid = ReaderPid, - writer_pid = WriterPid, - limiter_pid = undefined, - transaction_id = none, - tx_participants = sets:new(), - next_tag = 1, - uncommitted_ack_q = queue:new(), - unacked_message_q = queue:new(), - username = Username, - virtual_host = VHost, - most_recently_declared_queue = <<>>, - consumer_mapping = dict:new()}. - -handle_message({method, Method, Content}, State) -> + {ok, #ch{state = starting, + reader_pid = ReaderPid, + writer_pid = WriterPid, + limiter_pid = undefined, + transaction_id = none, + tx_participants = sets:new(), + next_tag = 1, + uncommitted_ack_q = queue:new(), + unacked_message_q = queue:new(), + username = Username, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + consumer_mapping = dict:new()}}. + +handle_call(_Request, _From, State) -> + noreply(State). + +handle_cast({method, Method, Content}, State) -> try handle_method(Method, Content, State) of {reply, Reply, NewState} -> ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), - NewState; + noreply(NewState); {noreply, NewState} -> - NewState; + noreply(NewState); stop -> - exit(normal) + {stop, normal, State#ch{state = terminating}} catch exit:{amqp, Error, Explanation, none} -> - terminate({amqp, Error, Explanation, - rabbit_misc:method_record_type(Method)}, - State); + {stop, {amqp, Error, Explanation, + rabbit_misc:method_record_type(Method)}, State}; exit:normal -> - terminate(normal, State); + {stop, normal, State}; _:Reason -> - terminate({Reason, erlang:get_stacktrace()}, State) + {stop, {Reason, erlang:get_stacktrace()}, State} end; -handle_message(terminate, State) -> - terminate(normal, State); +handle_cast(terminate, State) -> + {stop, normal, State}; -handle_message({command, Msg}, State = #ch{writer_pid = WriterPid}) -> +handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), - State; + noreply(State); -handle_message({deliver, ConsumerTag, AckRequired, Msg}, - State = #ch{proxy_pid = ProxyPid, - writer_pid = WriterPid, - next_tag = DeliveryTag}) -> +handle_cast({deliver, ConsumerTag, AckRequired, Msg}, + State = #ch{writer_pid = WriterPid, + next_tag = DeliveryTag}) -> State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), - ok = internal_deliver(WriterPid, ProxyPid, - true, ConsumerTag, DeliveryTag, Msg), - State1#ch{next_tag = DeliveryTag + 1}; + ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), + noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_message({conserve_memory, Conserve}, State) -> +handle_cast({conserve_memory, Conserve}, State) -> ok = rabbit_writer:send_command( State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), - State; - -handle_message({'EXIT', Pid, Reason}, State = #ch{proxy_pid = Pid}) -> - terminate(Reason, State); + noreply(State). -handle_message({'EXIT', _Pid, normal}, State) -> - State; +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; -handle_message({'EXIT', _Pid, Reason}, State) -> - terminate(Reason, State); +handle_info(timeout, State) -> + %% TODO: Once we drop support for R11B-5, we can change this to + %% {noreply, State, hibernate}; + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]). -handle_message(Other, State) -> - terminate({unexpected_channel_message, Other}, State). - -%%--------------------------------------------------------------------------- +terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, + state = terminating}) -> + rabbit_writer:shutdown(WriterPid), + rabbit_limiter:shutdown(LimiterPid); terminate(Reason, State = #ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> @@ -174,8 +169,14 @@ terminate(Reason, State = #ch{writer_pid = WriterPid, _ -> ok end, rabbit_writer:shutdown(WriterPid), - rabbit_limiter:shutdown(LimiterPid), - exit(Reason). + rabbit_limiter:shutdown(LimiterPid). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%--------------------------------------------------------------------------- + +noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -257,7 +258,6 @@ handle_method(_Method, _, #ch{state = starting}) -> handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = notify_queues(internal_rollback(State)), ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), - ok = rabbit_writer:shutdown(WriterPid), stop; handle_method(#'access.request'{},_, State) -> @@ -282,7 +282,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, content = DecodedContent, persistent_key = PersistentKey}, - rabbit_exchange:route(Exchange, RoutingKey), State)}; + rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, @@ -295,7 +295,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, true -> ok end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), - Participants = ack(State#ch.proxy_pid, TxnKey, Acked), + Participants = ack(TxnKey, Acked), {noreply, case TxnKey of none -> ok = notify_limiter(State#ch.limiter_pid, Acked), State#ch{unacked_message_q = Remaining}; @@ -309,12 +309,12 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{ proxy_pid = ProxyPid, writer_pid = WriterPid, + _, State = #ch{ writer_pid = WriterPid, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), case rabbit_amqqueue:with_or_die( QueueName, - fun (Q) -> rabbit_amqqueue:basic_get(Q, ProxyPid, NoAck) end) of + fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, Msg = {_QName, _QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, @@ -340,8 +340,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait}, - _, State = #ch{ proxy_pid = ProxyPid, - reader_pid = ReaderPid, + _, State = #ch{ reader_pid = ReaderPid, limiter_pid = LimiterPid, consumer_mapping = ConsumerMapping }) -> case dict:find(ConsumerTag, ConsumerMapping) of @@ -360,7 +359,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, fun (Q) -> rabbit_amqqueue:basic_consume( - Q, NoAck, ReaderPid, ProxyPid, LimiterPid, + Q, NoAck, ReaderPid, self(), LimiterPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -391,8 +390,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, - _, State = #ch{ proxy_pid = ProxyPid, - consumer_mapping = ConsumerMapping }) -> + _, State = #ch{consumer_mapping = ConsumerMapping }) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -413,7 +411,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, %% cancel_ok ourselves it might overtake a %% message sent previously by the queue. rabbit_amqqueue:basic_cancel( - Q, ProxyPid, ConsumerTag, + Q, self(), ConsumerTag, ok_msg(NoWait, #'basic.cancel_ok'{ consumer_tag = ConsumerTag})) end) of @@ -433,13 +431,12 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{ limiter_pid = LimiterPid, - proxy_pid = ProxyPid }) -> + _, State = #ch{ limiter_pid = LimiterPid }) -> NewLimiterPid = case {LimiterPid, PrefetchCount} of {undefined, 0} -> undefined; {undefined, _} -> - LPid = rabbit_limiter:start_link(ProxyPid), + LPid = rabbit_limiter:start_link(self()), ok = limit_queues(LPid, State), LPid; {_, 0} -> @@ -454,7 +451,6 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, - proxy_pid = ProxyPid, unacked_message_q = UAMQ }) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> @@ -463,14 +459,13 @@ handle_method(#'basic.recover'{requeue = true}, %% order. To keep it happy we reverse the id list %% since we are given them in reverse order. rabbit_amqqueue:requeue( - QPid, lists:reverse(MsgIds), ProxyPid) + QPid, lists:reverse(MsgIds), self()) end, ok, UAMQ), %% No answer required, apparently! {noreply, State#ch{unacked_message_q = queue:new()}}; handle_method(#'basic.recover'{requeue = false}, _, State = #ch{ transaction_id = none, - proxy_pid = ProxyPid, writer_pid = WriterPid, unacked_message_q = UAMQ }) -> lists:foreach( @@ -488,8 +483,7 @@ handle_method(#'basic.recover'{requeue = false}, %% %% FIXME: should we allocate a fresh DeliveryTag? ok = internal_deliver( - WriterPid, ProxyPid, - false, ConsumerTag, DeliveryTag, + WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) end, queue:to_list(UAMQ)), %% No answer required, apparently! @@ -778,10 +772,10 @@ add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) -> State#ch{tx_participants = sets:union(Participants, sets:from_list(MoreP))}. -ack(ProxyPid, TxnKey, UAQ) -> +ack(TxnKey, UAQ) -> fold_per_queue( fun (QPid, MsgIds, L) -> - ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, ProxyPid), + ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()), [QPid | L] end, [], UAQ). @@ -835,11 +829,11 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> - rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), ProxyPid). +notify_queues(#ch{consumer_mapping = Consumers}) -> + rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()). -limit_queues(LPid, #ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> - rabbit_amqqueue:limit_all(consumer_queues(Consumers), ProxyPid, LPid). +limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> + rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). consumer_queues(Consumers) -> [QPid || QueueName <- @@ -855,7 +849,8 @@ consumer_queues(Consumers) -> %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, but not acks for -%% messages sent in a response to a basic.get. +%% messages sent in a response to a basic.get (identified by their +%% 'none' consumer tag) notify_limiter(undefined, _Acked) -> ok; notify_limiter(LimiterPid, Acked) -> @@ -882,7 +877,7 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag, +internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -894,6 +889,6 @@ internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag, routing_key = RoutingKey}, ok = case Notify of true -> rabbit_writer:send_command_and_notify( - WriterPid, QPid, ChPid, M, Content); + WriterPid, QPid, self(), M, Content); false -> rabbit_writer:send_command(WriterPid, M, Content) end. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index ecc285a57f..cbc11b4031 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -57,7 +57,7 @@ start() -> true -> ok; false -> io:format("...done.~n") end, - init:stop(); + halt(); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> error("invalid command '~s'", [lists:flatten( @@ -138,7 +138,7 @@ The list_queues, list_exchanges and list_bindings commands accept an optional virtual host parameter for which to display results. The default value is \"/\". <QueueInfoItem> must be a member of the list [name, durable, auto_delete, -arguments, pid, messages_ready, messages_unacknowledged, messages_uncommitted, +arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted, messages, acks_uncommitted, consumers, transactions, memory]. The default is to display name and (number of) messages. @@ -148,7 +148,7 @@ auto_delete, arguments]. The default is to display name and type. The output format for \"list_bindings\" is a list of rows containing exchange name, routing key, queue name and arguments, in that order. -<ConnectionInfoItem> must be a member of the list [pid, address, port, +<ConnectionInfoItem> must be a member of the list [node, address, port, peer_address, peer_port, state, channels, user, vhost, timeout, frame_max, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display user, peer_address and peer_port. @@ -242,7 +242,8 @@ action(list_vhost_users, Node, Args = [_VHostPath], Inform) -> action(list_queues, Node, Args, Inform) -> Inform("Listing queues", []), {VHostArg, RemainingArgs} = parse_vhost_flag(Args), - ArgAtoms = default_if_empty(RemainingArgs, [name, messages]), + ArgAtoms = list_replace(node, pid, + default_if_empty(RemainingArgs, [name, messages])), display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, [VHostArg, ArgAtoms]), ArgAtoms); @@ -267,7 +268,8 @@ action(list_bindings, Node, Args, Inform) -> action(list_connections, Node, Args, Inform) -> Inform("Listing connections", []), - ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port]), + ArgAtoms = list_replace(node, pid, + default_if_empty(Args, [user, peer_address, peer_port])), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), ArgAtoms). @@ -308,9 +310,10 @@ format_info_item(Items, Key) -> case Info of {_, #resource{name = Name}} -> url_encode(Name); - {Key, IpAddress} when Key =:= address; Key =:= peer_address - andalso is_tuple(IpAddress) -> - inet_parse:ntoa(IpAddress); + _ when Key =:= address; Key =:= peer_address andalso is_tuple(Value) -> + inet_parse:ntoa(Value); + _ when is_pid(Value) -> + atom_to_list(node(Value)); _ when is_binary(Value) -> url_encode(Value); _ -> @@ -357,3 +360,6 @@ url_encode_char([], Acc) -> d2h(N) when N<10 -> N+$0; d2h(N) -> N+$a-10. +list_replace(Find, Replace, List) -> + [case X of Find -> Replace; _ -> X end || X <- List]. + diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 925c335cee..960e4945fe 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -37,11 +37,11 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, simple_publish/6, simple_publish/3, - route/2]). + route/3]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_bindings_for_queue/1]). --export([check_type/1, assert_type/2, topic_matches/2]). +-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). %% EXTENDED API -export([list_exchange_bindings/1]). @@ -77,7 +77,7 @@ (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> publish_res()). -spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). --spec(route/2 :: (exchange(), routing_key()) -> [pid()]). +-spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]). -spec(add_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). @@ -88,6 +88,7 @@ [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). -spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok'). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). +-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> @@ -145,6 +146,8 @@ check_type(<<"direct">>) -> direct; check_type(<<"topic">>) -> topic; +check_type(<<"headers">>) -> + headers; check_type(T) -> rabbit_misc:protocol_error( command_invalid, "invalid exchange type '~s'", [T]). @@ -211,54 +214,69 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, %% Usable by Erlang code that wants to publish messages. simple_publish(Mandatory, Immediate, Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey}) -> + routing_key = RoutingKey, + content = Content}) -> case lookup(ExchangeName) of {ok, Exchange} -> - QPids = route(Exchange, RoutingKey), + QPids = route(Exchange, RoutingKey, Content), rabbit_router:deliver(QPids, Mandatory, Immediate, none, Message); {error, Error} -> {error, Error} end. +sort_arguments(Arguments) -> + lists:keysort(1, Arguments). + %% return the list of qpids to which a message with a given routing %% key, sent to a particular exchange, should be delivered. %% %% The function ensures that a qpid appears in the return list exactly %% as many times as a message should be delivered to it. With the %% current exchange types that is at most once. -%% +route(X = #exchange{type = topic}, RoutingKey, _Content) -> + match_bindings(X, fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end); + +route(X = #exchange{type = headers}, _RoutingKey, Content) -> + Headers = case (Content#content.properties)#'P_basic'.headers of + undefined -> []; + H -> sort_arguments(H) + end, + match_bindings(X, fun (#binding{args = Spec}) -> + headers_match(Spec, Headers) + end); + +route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> + match_routing_key(X, '_'); + +route(X = #exchange{type = direct}, RoutingKey, _Content) -> + match_routing_key(X, RoutingKey). + %% TODO: Maybe this should be handled by a cursor instead. -route(#exchange{name = Name, type = topic}, RoutingKey) -> - Query = qlc:q([QName || - #route{binding = #binding{ - exchange_name = ExchangeName, - queue_name = QName, - key = BindingKey}} <- mnesia:table(route), - ExchangeName == Name, - %% TODO: This causes a full scan for each entry - %% with the same exchange (see bug 19336) - topic_matches(BindingKey, RoutingKey)]), +%% TODO: This causes a full scan for each entry with the same exchange +match_bindings(#exchange{name = Name}, Match) -> + Query = qlc:q([QName || #route{binding = Binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName}} <- + mnesia:table(route), + ExchangeName == Name, + Match(Binding)]), lookup_qpids( try mnesia:async_dirty(fun qlc:e/1, [Query]) catch exit:{aborted, {badarg, _}} -> %% work around OTP-7025, which was fixed in R12B-1, by %% falling back on a less efficient method - [QName || #route{binding = #binding{queue_name = QName, - key = BindingKey}} <- + [QName || #route{binding = Binding = #binding{ + queue_name = QName}} <- mnesia:dirty_match_object( #route{binding = #binding{exchange_name = Name, _ = '_'}}), - topic_matches(BindingKey, RoutingKey)] - end); - -route(X = #exchange{type = fanout}, _) -> - route_internal(X, '_'); - -route(X = #exchange{type = direct}, RoutingKey) -> - route_internal(X, RoutingKey). + Match(Binding)] + end). -route_internal(#exchange{name = Name}, RoutingKey) -> +match_routing_key(#exchange{name = Name}, RoutingKey) -> MatchHead = #route{binding = #binding{exchange_name = Name, queue_name = '$1', key = RoutingKey, @@ -377,7 +395,7 @@ sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> Binding = #binding{exchange_name = ExchangeName, queue_name = QueueName, key = RoutingKey, - args = Arguments}, + args = sort_arguments(Arguments)}, ok = case Durable of true -> Fun(durable_routes, #route{binding = Binding}, write); false -> ok @@ -429,6 +447,67 @@ reverse_binding(#binding{exchange_name = Exchange, key = Key, args = Args}. +default_headers_match_kind() -> all. + +parse_x_match(<<"all">>) -> all; +parse_x_match(<<"any">>) -> any; +parse_x_match(Other) -> + rabbit_log:warning("Invalid x-match field value ~p; expected all or any", + [Other]), + default_headers_match_kind(). + +%% Horrendous matching algorithm. Depends for its merge-like +%% (linear-time) behaviour on the lists:keysort (sort_arguments) that +%% route/3 and sync_binding/6 do. +%% +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% +headers_match(Pattern, Data) -> + MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of + {value, {_, longstr, MK}} -> parse_x_match(MK); + {value, {_, Type, MK}} -> + rabbit_log:warning("Invalid x-match field type ~p " + "(value ~p); expected longstr", + [Type, MK]), + default_headers_match_kind(); + _ -> default_headers_match_kind() + end, + headers_match(Pattern, Data, true, false, MatchKind). + +headers_match([], _Data, AllMatch, _AnyMatch, all) -> + AllMatch; +headers_match([], _Data, _AllMatch, AnyMatch, any) -> + AnyMatch; +headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, + AllMatch, AnyMatch, MatchKind) -> + headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); +headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> + headers_match([], [], false, AnyMatch, MatchKind); +headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK > DK -> + headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); +headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], + _AllMatch, AnyMatch, MatchKind) when PK < DK -> + headers_match(PRest, Data, false, AnyMatch, MatchKind); +headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK == DK -> + {AllMatch1, AnyMatch1} = + if + %% It's not properly specified, but a "no value" in a + %% pattern field is supposed to mean simple presence of + %% the corresponding data field. I've interpreted that to + %% mean a type of "void" for the pattern field. + PT == void -> {AllMatch, true}; + %% Similarly, it's not specified, but I assume that a + %% mismatched type causes a mismatched value. + PT =/= DT -> {false, AnyMatch}; + PV == DV -> {AllMatch, true}; + true -> {false, AnyMatch} + end, + headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). + split_topic_key(Key) -> {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), KeySplit. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 7ecdb6fbb9..532be26d8e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -42,13 +42,15 @@ -ifdef(use_specs). +-type(maybe_pid() :: pid() | 'undefined'). + -spec(start_link/1 :: (pid()) -> pid()). --spec(shutdown/1 :: (pid()) -> 'ok'). --spec(limit/2 :: (pid(), non_neg_integer()) -> 'ok'). --spec(can_send/2 :: (pid(), pid()) -> bool()). --spec(ack/2 :: (pid(), non_neg_integer()) -> 'ok'). --spec(register/2 :: (pid(), pid()) -> 'ok'). --spec(unregister/2 :: (pid(), pid()) -> 'ok'). +-spec(shutdown/1 :: (maybe_pid()) -> 'ok'). +-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). +-spec(can_send/2 :: (maybe_pid(), pid()) -> bool()). +-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). +-spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). +-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). -endif. @@ -56,8 +58,11 @@ -record(lim, {prefetch_count = 0, ch_pid, - queues = dict:new(), + queues = dict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). +%% 'Notify' is a boolean that indicates whether a queue should be +%% notified of a change in the limit or volume that may allow it to +%% deliver more messages via the limiter's channel. %%---------------------------------------------------------------------------- %% API @@ -70,6 +75,7 @@ start_link(ChPid) -> shutdown(undefined) -> ok; shutdown(LimiterPid) -> + unlink(LimiterPid), gen_server2:cast(LimiterPid, shutdown). limit(undefined, 0) -> diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 7f6eaa8e93..5e8edd53a1 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -50,7 +50,7 @@ start() -> case catch action(Command, Args, RpcTimeout) of ok -> io:format("done.~n"), - init:stop(); + halt(); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> error("invalid command '~s'", [lists:flatten( diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 0b36a53cee..26d857bef0 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -110,7 +110,7 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, Txn, Message) -> R = rabbit_misc:upmap( fun ({Node, QPids}) -> - try gen_server:call( + try gen_server2:call( {?SERVER, Node}, {deliver, QPids, Mandatory, Immediate, Txn, Message}) catch |
