diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-04-15 16:19:32 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-04-15 16:19:32 +0100 |
| commit | 1f5f030b9394759f2a1f55e07d12eb89742cfc32 (patch) | |
| tree | 083c1c2c4a24a5790b4f2cf4aa28e2a5e7068884 | |
| parent | 8a6d284c9adeec24d93e009b3e4ceb9063883bf2 (diff) | |
| parent | eddc7dc7b9c93abb8e9f1c24f0ce6e92ab0a4bc3 (diff) | |
| download | rabbitmq-server-git-1f5f030b9394759f2a1f55e07d12eb89742cfc32.tar.gz | |
merge default into bug20354
| -rw-r--r-- | Makefile | 2 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/init.d | 1 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 13 | ||||
| -rw-r--r-- | packaging/common/rabbitmq-script-wrapper | 2 | ||||
| -rwxr-xr-x[-rw-r--r--] | packaging/debs/Debian/debian/copyright | 27 | ||||
| -rw-r--r-- | packaging/debs/Debian/debian/init.d | 1 | ||||
| -rwxr-xr-x | scripts/rabbitmq-multi | 14 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server | 24 | ||||
| -rwxr-xr-x | scripts/rabbitmqctl | 5 | ||||
| -rwxr-xr-x | scripts/rabbitmqctl.bat | 2 | ||||
| -rw-r--r-- | src/rabbit_access_control.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_alarm.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 67 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_guid.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_persister.erl | 54 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 3 |
20 files changed, 166 insertions, 109 deletions
@@ -133,7 +133,7 @@ srcdist: distclean sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ - cp codegen.py Makefile $(TARGET_SRC_DIR) + cp codegen.py Makefile generate_app $(TARGET_SRC_DIR) cp -r scripts $(TARGET_SRC_DIR) cp -r docs $(TARGET_SRC_DIR) diff --git a/packaging/RPMS/Fedora/init.d b/packaging/RPMS/Fedora/init.d index d624e7c7af..a9155f3b03 100644 --- a/packaging/RPMS/Fedora/init.d +++ b/packaging/RPMS/Fedora/init.d @@ -33,7 +33,6 @@ fi RETVAL=0 set -e -cd / start_rabbitmq () { set +e diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 3695c6906d..9a67a3410d 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -24,6 +24,7 @@ scalable implementation of an AMQP broker. %define _rabbit_erllibdir %{_libdir}/erlang/lib/rabbitmq_server-%{version} %define _rabbit_libdir %{_libdir}/rabbitmq +%define _rabbit_wrapper %{_builddir}/`basename %{S:2}` %define _maindir %{buildroot}%{_rabbit_erllibdir} @@ -36,10 +37,10 @@ fi %prep %setup -q -sed -i 's|/usr/lib/|%{_libdir}/|' %{S:1} -sed -i 's|/usr/lib/|%{_libdir}/|' %{S:2} %build +cp %{S:2} %{_rabbit_wrapper} +sed 's|/usr/lib/|%{_libdir}/|' %{_rabbit_wrapper} make %{?_smp_mflags} %install @@ -54,9 +55,9 @@ mkdir -p %{buildroot}%{_localstatedir}/log/rabbitmq #Copy all necessary lib files etc. install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server -install -p -D -m 0755 %{S:2} %{buildroot}%{_sbindir}/rabbitmqctl -install -p -D -m 0755 %{S:2} %{buildroot}%{_sbindir}/rabbitmq-server -install -p -D -m 0755 %{S:2} %{buildroot}%{_sbindir}/rabbitmq-multi +install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmqctl +install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server +install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server @@ -100,8 +101,6 @@ fi %defattr(-,root,root,-) %attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq %attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/log/rabbitmq -%dir %{_localstatedir}/lib/rabbitmq -%dir %{_localstatedir}/log/rabbitmq %dir %{_sysconfdir}/rabbitmq %{_rabbit_erllibdir} %{_rabbit_libdir} diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper index 217d1658b5..296a77d19c 100644 --- a/packaging/common/rabbitmq-script-wrapper +++ b/packaging/common/rabbitmq-script-wrapper @@ -9,7 +9,7 @@ for arg in "$@" ; do CMDLINE="${CMDLINE} \"${arg}\"" done -cd / +cd /var/lib/rabbitmq SCRIPT=`basename $0` diff --git a/packaging/debs/Debian/debian/copyright b/packaging/debs/Debian/debian/copyright index 854db2900f..69867220f0 100644..100755 --- a/packaging/debs/Debian/debian/copyright +++ b/packaging/debs/Debian/debian/copyright @@ -3,9 +3,30 @@ Wed, 3 Jan 2007 15:43:44 +0000. It was downloaded from http://www.rabbitmq.com/ -codegen/amqp-0.8.json is released under the MIT License and is -Copyright (C) 2008-2009 LShift Ltd, Cohesive Financial Technologies -LLC, and Rabbit Technologies Ltd. +The file codegen/amqp-0.8.json is covered by the following terms: + + "Copyright (C) 2008-2009 LShift Ltd, Cohesive Financial Technologies LLC, + and Rabbit Technologies Ltd + + Permission is hereby granted, free of charge, to any person + obtaining a copy of this file (the Software), to deal in the + Software without restriction, including without limitation the + rights to use, copy, modify, merge, publish, distribute, + sublicense, and/or sell copies of the Software, and to permit + persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + OTHER DEALINGS IN THE SOFTWARE." The rest of this package is licensed under the Mozilla Public License 1.1 Authors and Copyright are as described below: diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d index ef66add5a5..a35a60ec68 100644 --- a/packaging/debs/Debian/debian/init.d +++ b/packaging/debs/Debian/debian/init.d @@ -26,7 +26,6 @@ fi RETVAL=0 set -e -cd / start_rabbitmq () { set +e diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 4cf0703a4c..1d0c785f6b 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -29,23 +29,23 @@ ## ## Contributor(s): ______________________________________. ## +NODENAME=rabbit +NODE_IP_ADDRESS=0.0.0.0 +NODE_PORT=5672 +SCRIPT_HOME=$(dirname $0) +PIDS_FILE=/var/lib/rabbitmq/pids +MULTI_ERL_ARGS= +MULTI_START_ARGS= [ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} -[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=rabbit [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=0.0.0.0 [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} -[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=5672 [ "x" = "x$RABBITMQ_SCRIPT_HOME" ] && RABBITMQ_SCRIPT_HOME=${SCRIPT_HOME} -[ "x" = "x$RABBITMQ_SCRIPT_HOME" ] && RABBITMQ_SCRIPT_HOME=$(dirname $0) [ "x" = "x$RABBITMQ_PIDS_FILE" ] && RABBITMQ_PIDS_FILE=${PIDS_FILE} -[ "x" = "x$RABBITMQ_PIDS_FILE" ] && RABBITMQ_PIDS_FILE=/var/lib/rabbitmq/pids [ "x" = "x$RABBITMQ_MULTI_ERL_ARGS" ] && RABBITMQ_MULTI_ERL_ARGS=${MULTI_ERL_ARGS} -[ "x" = "x$RABBITMQ_MULTI_ERL_ARGS" ] && RABBITMQ_MULTI_ERL_ARGS= [ "x" = "x$RABBITMQ_MULTI_START_ARGS" ] && RABBITMQ_MULTI_START_ARGS=${MULTI_START_ARGS} -[ "x" = "x$RABBITMQ_MULTI_START_ARGS" ] && RABBITMQ_MULTI_START_ARGS= export \ RABBITMQ_NODENAME \ diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 6273804f56..d5be944a27 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -30,28 +30,30 @@ ## Contributor(s): ______________________________________. ## +NODENAME=rabbit +NODE_IP_ADDRESS=0.0.0.0 +NODE_PORT=5672 +SERVER_ERL_ARGS="+K true +A30 \ +-kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \ +-kernel inet_default_connect_options [{nodelay,true}]" +CLUSTER_CONFIG_FILE=/etc/rabbitmq/cluster.config +LOG_BASE=/var/log/rabbitmq +MNESIA_BASE=/var/lib/rabbitmq/mnesia +SERVER_START_ARGS= + [ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} -[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=rabbit [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=0.0.0.0 [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} -[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=5672 [ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} -[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS="+K true +A30 \ --kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \ --kernel inet_default_connect_options [{nodelay,true}]" [ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE} -[ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config [ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE} -[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=/var/log/rabbitmq [ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE} -[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=/var/lib/rabbitmq/mnesia +[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS} + [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR} [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME} -[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS} -[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS= ## Log rotation [ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS} diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index b941b85005..c57978c050 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -30,10 +30,15 @@ ## Contributor(s): ______________________________________. ## +[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf + +[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS} + exec erl \ -pa "`dirname $0`/../ebin" \ -noinput \ -hidden \ + ${RABBITMQ_CTL_ERL_ARGS} \ -sname rabbitmqctl$$ \ -s rabbit_control \ -extra "$@" diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index 33a10777d0..e4dccfba64 100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -46,4 +46,4 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" ( exit /B
)
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden -sname rabbitmqctl -s rabbit_control -extra %*
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_CTL_ERL_ARGS% -sname rabbitmqctl -s rabbit_control -extra %*
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index da0ab9cf7a..54348d9a1c 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -192,7 +192,7 @@ delete_user(Username) -> fun () -> ok = mnesia:delete({rabbit_user, Username}), [ok = mnesia:delete_object( - rabbit_user_permissions, R, write) || + rabbit_user_permission, R, write) || R <- mnesia:match_object( rabbit_user_permission, #user_permission{user_vhost = #user_vhost{ diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 875624ba55..21999f16c3 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -78,7 +78,8 @@ stop() -> register(Pid, HighMemMFA) -> ok = gen_event:call(alarm_handler, ?MODULE, - {register, Pid, HighMemMFA}). + {register, Pid, HighMemMFA}, + infinity). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 59b4d039e7..06bb18f5d3 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -124,19 +124,32 @@ recover() -> recover_durable_queues() -> Node = node(), - %% TODO: use dirty ops instead - R = rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} + lists:foreach( + fun (RecoveredQ) -> + Q = start_queue_process(RecoveredQ), + %% We need to catch the case where a client connected to + %% another node has deleted the queue (and possibly + %% re-created it). + case rabbit_misc:execute_mnesia_transaction( + fun () -> case mnesia:match_object( + rabbit_durable_queue, RecoveredQ, read) of + [_] -> ok = store_queue(Q), + true; + [] -> false + end + end) of + true -> ok; + false -> exit(Q#amqqueue.pid, shutdown) + end + end, + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} <- mnesia:table(rabbit_durable_queue), - node(Pid) == Node])) - end), - Queues = lists:map(fun start_queue_process/1, R), - rabbit_misc:execute_mnesia_transaction( - fun () -> - lists:foreach(fun store_queue/1, Queues), - ok - end). + node(Pid) == Node])) + end)), + ok. declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -200,10 +213,10 @@ list(VHostPath) -> map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - gen_server2:pcall(QPid, 9, info). + gen_server2:pcall(QPid, 9, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case gen_server2:pcall(QPid, 9, {info, Items}) of + case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -212,20 +225,20 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). -stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat). +stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity). stat_all() -> lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - gen_server2:call(QPid, {delete, IfUnused, IfEmpty}). + gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity). -purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge). +purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). deliver(_IsMandatory, true, Txn, Message, QPid) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message}); + gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity); deliver(true, _IsImmediate, Txn, Message, QPid) -> - gen_server2:call(QPid, {deliver, Txn, Message}), + gen_server2:call(QPid, {deliver, Txn, Message}, infinity), true; deliver(false, _IsImmediate, Txn, Message, QPid) -> gen_server2:cast(QPid, {deliver, Txn, Message}), @@ -241,10 +254,9 @@ ack(QPid, Txn, MsgIds, ChPid) -> gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> - Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end, QPids). rollback_all(QPids, Txn) -> @@ -254,12 +266,11 @@ rollback_all(QPids, Txn) -> QPids). notify_down_all(QPids, ChPid) -> - Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( %% we don't care if the queue process has terminated in the %% meantime fun (_) -> ok end, - fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end, QPids). limit_all(QPids, ChPid, LimiterPid) -> @@ -269,18 +280,20 @@ limit_all(QPids, ChPid, LimiterPid) -> QPids). claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - gen_server2:call(QPid, {claim_queue, ReaderPid}). + gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - gen_server2:call(QPid, {basic_get, ChPid, NoAck}). + gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity). basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, - LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, + infinity). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). + ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, + infinity). notify_sent(QPid, ChPid) -> gen_server2:cast(QPid, {notify_sent, ChPid}). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 19efd9fc22..a57e8076bf 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -103,24 +103,17 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, arguments]. recover() -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - mnesia:foldl( - fun (Exchange, Acc) -> - ok = mnesia:write(rabbit_exchange, Exchange, write), - Acc - end, ok, rabbit_durable_exchange), - mnesia:foldl( - fun (Route, Acc) -> - {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(rabbit_route, - Route, write), - ok = mnesia:write(rabbit_reverse_route, - ReverseRoute, write), - Acc - end, ok, rabbit_durable_route), - ok - end). + ok = rabbit_misc:table_foreach( + fun(Exchange) -> ok = mnesia:write(rabbit_exchange, + Exchange, write) + end, rabbit_durable_exchange), + ok = rabbit_misc:table_foreach( + fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(rabbit_route, + Route, write), + ok = mnesia:write(rabbit_reverse_route, + ReverseRoute, write) + end, rabbit_durable_route). declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange = #exchange{name = ExchangeName, diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 51c1665bbb..2be005034e 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -82,7 +82,8 @@ guid() -> %% and time. We combine that with a process-local counter to give %% us a GUID that is monotonically increasing per process. G = case get(guid) of - undefined -> {{gen_server:call(?SERVER, serial), self()}, 0}; + undefined -> {{gen_server:call(?SERVER, serial, infinity), self()}, + 0}; {S, I} -> {S, I+1} end, put(guid, G), diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 532be26d8e..3f9b6ebb9b 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -90,7 +90,7 @@ can_send(undefined, _QPid) -> can_send(LimiterPid, QPid) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid}) end). + fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end). %% Let the limiter know that the channel has received some acks from a %% consumer diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 5d176f8fac..de7bc010b2 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -46,6 +46,7 @@ -export([ensure_ok/2]). -export([localnode/1, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). +-export([table_foreach/2]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). @@ -98,6 +99,7 @@ -spec(intersperse/2 :: (A, [A]) -> [A]). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). +-spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok'). -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). @@ -298,6 +300,21 @@ map_in_order(F, L) -> lists:reverse( lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)). +%% For each entry in a table, execute a function in a transaction. +%% This is often far more efficient than wrapping a tx around the lot. +%% +%% We ignore entries that have been modified or removed. +table_foreach(F, TableName) -> + lists:foreach( + fun (E) -> execute_mnesia_transaction( + fun () -> case mnesia:match_object(TableName, E, read) of + [] -> ok; + _ -> F(E) + end + end) + end, dirty_read_all(TableName)), + ok. + dirty_read_all(TableName) -> mnesia:dirty_select(TableName, [{'$1',[],['$1']}]). diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 94033a4f3d..f4fa45993a 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -49,6 +49,8 @@ -define(LOG_BUNDLE_DELAY, 5). -define(COMPLETE_BUNDLE_DELAY, 2). +-define(HIBERNATE_AFTER, 10000). + -define(MAX_WRAP_ENTRIES, 500). -define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}). @@ -93,7 +95,7 @@ start_link() -> transaction(MessageList) -> ?LOGDEBUG("transaction ~p~n", [MessageList]), TxnKey = rabbit_guid:guid(), - gen_server:call(?SERVER, {transaction, TxnKey, MessageList}). + gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity). extend_transaction(TxnKey, MessageList) -> ?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]), @@ -105,17 +107,17 @@ dirty_work(MessageList) -> commit_transaction(TxnKey) -> ?LOGDEBUG("commit_transaction ~p~n", [TxnKey]), - gen_server:call(?SERVER, {commit_transaction, TxnKey}). + gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity). rollback_transaction(TxnKey) -> ?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]), gen_server:cast(?SERVER, {rollback_transaction, TxnKey}). force_snapshot() -> - gen_server:call(?SERVER, force_snapshot). + gen_server:call(?SERVER, force_snapshot, infinity). serial() -> - gen_server:call(?SERVER, serial). + gen_server:call(?SERVER, serial, infinity). %%-------------------------------------------------------------------- @@ -164,10 +166,8 @@ handle_call({transaction, Key, MessageList}, From, State) -> do_noreply(internal_commit(From, Key, NewState)); handle_call({commit_transaction, TxnKey}, From, State) -> do_noreply(internal_commit(From, TxnKey, State)); -handle_call(force_snapshot, _From, State = #pstate{log_handle = LH, - snapshot = Snapshot}) -> - ok = take_snapshot(LH, Snapshot), - do_reply(ok, State); +handle_call(force_snapshot, _From, State) -> + do_reply(ok, flush(true, State)); handle_call(serial, _From, State = #pstate{snapshot = #psnapshot{serial = Serial}}) -> do_reply(Serial, State); @@ -183,8 +183,13 @@ handle_cast({extend_transaction, TxnKey, MessageList}, State) -> handle_cast(_Msg, State) -> {noreply, State}. +handle_info(timeout, State = #pstate{deadline = infinity}) -> + State1 = flush(true, State), + %% TODO: Once we drop support for R11B-5, we can change this to + %% {noreply, State1, hibernate}; + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]); handle_info(timeout, State) -> - {noreply, flush(State)}; + do_noreply(flush(State)); handle_info(_Info, State) -> {noreply, State}. @@ -275,12 +280,13 @@ take_snapshot_and_save_old(LogHandle, Snapshot) -> rabbit_log:info("Saving persister log in ~p~n", [OldFileName]), ok = take_snapshot(LogHandle, OldFileName, Snapshot). -maybe_take_snapshot(State = #pstate{entry_count = EntryCount, log_handle = LH, - snapshot = Snapshot}) - when EntryCount >= ?MAX_WRAP_ENTRIES -> +maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount, + log_handle = LH, + snapshot = Snapshot}) + when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES -> ok = take_snapshot(LH, Snapshot), State#pstate{entry_count = 0}; -maybe_take_snapshot(State) -> +maybe_take_snapshot(_Force, State) -> State. later_ms(DeltaMilliSec) -> @@ -298,7 +304,7 @@ compute_deadline(_TimerDelay, ExistingDeadline) -> ExistingDeadline. compute_timeout(infinity) -> - infinity; + ?HIBERNATE_AFTER; compute_timeout(Deadline) -> DeltaMilliSec = time_diff(Deadline, now()) * 1000.0, if @@ -314,18 +320,18 @@ do_noreply(State = #pstate{deadline = Deadline}) -> do_reply(Reply, State = #pstate{deadline = Deadline}) -> {reply, Reply, State, compute_timeout(Deadline)}. -flush(State = #pstate{pending_logs = PendingLogs, - pending_replies = Waiting, - log_handle = LogHandle}) -> - State1 = if - PendingLogs /= [] -> +flush(State) -> flush(false, State). + +flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs, + pending_replies = Waiting, + log_handle = LogHandle}) -> + State1 = if PendingLogs /= [] -> disk_log:alog(LogHandle, lists:reverse(PendingLogs)), - maybe_take_snapshot( - State#pstate{ - entry_count = State#pstate.entry_count + 1}); - true -> + State#pstate{entry_count = State#pstate.entry_count + 1}; + true -> State end, + State2 = maybe_take_snapshot(ForceSnapshot, State1), if Waiting /= [] -> ok = disk_log:sync(LogHandle), lists:foreach(fun (From) -> gen_server:reply(From, ok) end, @@ -333,7 +339,7 @@ flush(State = #pstate{pending_logs = PendingLogs, true -> ok end, - State1#pstate{deadline = infinity, + State2#pstate{deadline = infinity, pending_logs = [], pending_replies = []}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 9f642e3540..ef8038e7aa 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -161,10 +161,10 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. info(Pid) -> - gen_server:call(Pid, info). + gen_server:call(Pid, info, infinity). info(Pid, Items) -> - case gen_server:call(Pid, {info, Items}) of + case gen_server:call(Pid, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ff42ea0460..0b06a063a7 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -112,7 +112,8 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, fun ({Node, QPids}) -> try gen_server2:call( {?SERVER, Node}, - {deliver, QPids, Mandatory, Immediate, Txn, Message}) + {deliver, QPids, Mandatory, Immediate, Txn, Message}, + infinity) catch _Class:_Reason -> %% TODO: figure out what to log (and do!) here |
