summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-04-15 16:19:32 +0100
committerMatthias Radestock <matthias@lshift.net>2009-04-15 16:19:32 +0100
commit1f5f030b9394759f2a1f55e07d12eb89742cfc32 (patch)
tree083c1c2c4a24a5790b4f2cf4aa28e2a5e7068884
parent8a6d284c9adeec24d93e009b3e4ceb9063883bf2 (diff)
parenteddc7dc7b9c93abb8e9f1c24f0ce6e92ab0a4bc3 (diff)
downloadrabbitmq-server-git-1f5f030b9394759f2a1f55e07d12eb89742cfc32.tar.gz
merge default into bug20354
-rw-r--r--Makefile2
-rw-r--r--packaging/RPMS/Fedora/init.d1
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec13
-rw-r--r--packaging/common/rabbitmq-script-wrapper2
-rwxr-xr-x[-rw-r--r--]packaging/debs/Debian/debian/copyright27
-rw-r--r--packaging/debs/Debian/debian/init.d1
-rwxr-xr-xscripts/rabbitmq-multi14
-rwxr-xr-xscripts/rabbitmq-server24
-rwxr-xr-xscripts/rabbitmqctl5
-rwxr-xr-xscripts/rabbitmqctl.bat2
-rw-r--r--src/rabbit_access_control.erl2
-rw-r--r--src/rabbit_alarm.erl3
-rw-r--r--src/rabbit_amqqueue.erl67
-rw-r--r--src/rabbit_exchange.erl29
-rw-r--r--src/rabbit_guid.erl3
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_misc.erl17
-rw-r--r--src/rabbit_persister.erl54
-rw-r--r--src/rabbit_reader.erl4
-rw-r--r--src/rabbit_router.erl3
20 files changed, 166 insertions, 109 deletions
diff --git a/Makefile b/Makefile
index 81ac568731..47aa586c8c 100644
--- a/Makefile
+++ b/Makefile
@@ -133,7 +133,7 @@ srcdist: distclean
sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in
cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/
- cp codegen.py Makefile $(TARGET_SRC_DIR)
+ cp codegen.py Makefile generate_app $(TARGET_SRC_DIR)
cp -r scripts $(TARGET_SRC_DIR)
cp -r docs $(TARGET_SRC_DIR)
diff --git a/packaging/RPMS/Fedora/init.d b/packaging/RPMS/Fedora/init.d
index d624e7c7af..a9155f3b03 100644
--- a/packaging/RPMS/Fedora/init.d
+++ b/packaging/RPMS/Fedora/init.d
@@ -33,7 +33,6 @@ fi
RETVAL=0
set -e
-cd /
start_rabbitmq () {
set +e
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 3695c6906d..9a67a3410d 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -24,6 +24,7 @@ scalable implementation of an AMQP broker.
%define _rabbit_erllibdir %{_libdir}/erlang/lib/rabbitmq_server-%{version}
%define _rabbit_libdir %{_libdir}/rabbitmq
+%define _rabbit_wrapper %{_builddir}/`basename %{S:2}`
%define _maindir %{buildroot}%{_rabbit_erllibdir}
@@ -36,10 +37,10 @@ fi
%prep
%setup -q
-sed -i 's|/usr/lib/|%{_libdir}/|' %{S:1}
-sed -i 's|/usr/lib/|%{_libdir}/|' %{S:2}
%build
+cp %{S:2} %{_rabbit_wrapper}
+sed 's|/usr/lib/|%{_libdir}/|' %{_rabbit_wrapper}
make %{?_smp_mflags}
%install
@@ -54,9 +55,9 @@ mkdir -p %{buildroot}%{_localstatedir}/log/rabbitmq
#Copy all necessary lib files etc.
install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server
-install -p -D -m 0755 %{S:2} %{buildroot}%{_sbindir}/rabbitmqctl
-install -p -D -m 0755 %{S:2} %{buildroot}%{_sbindir}/rabbitmq-server
-install -p -D -m 0755 %{S:2} %{buildroot}%{_sbindir}/rabbitmq-multi
+install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmqctl
+install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server
+install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi
install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server
@@ -100,8 +101,6 @@ fi
%defattr(-,root,root,-)
%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq
%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/log/rabbitmq
-%dir %{_localstatedir}/lib/rabbitmq
-%dir %{_localstatedir}/log/rabbitmq
%dir %{_sysconfdir}/rabbitmq
%{_rabbit_erllibdir}
%{_rabbit_libdir}
diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper
index 217d1658b5..296a77d19c 100644
--- a/packaging/common/rabbitmq-script-wrapper
+++ b/packaging/common/rabbitmq-script-wrapper
@@ -9,7 +9,7 @@ for arg in "$@" ; do
CMDLINE="${CMDLINE} \"${arg}\""
done
-cd /
+cd /var/lib/rabbitmq
SCRIPT=`basename $0`
diff --git a/packaging/debs/Debian/debian/copyright b/packaging/debs/Debian/debian/copyright
index 854db2900f..69867220f0 100644..100755
--- a/packaging/debs/Debian/debian/copyright
+++ b/packaging/debs/Debian/debian/copyright
@@ -3,9 +3,30 @@ Wed, 3 Jan 2007 15:43:44 +0000.
It was downloaded from http://www.rabbitmq.com/
-codegen/amqp-0.8.json is released under the MIT License and is
-Copyright (C) 2008-2009 LShift Ltd, Cohesive Financial Technologies
-LLC, and Rabbit Technologies Ltd.
+The file codegen/amqp-0.8.json is covered by the following terms:
+
+ "Copyright (C) 2008-2009 LShift Ltd, Cohesive Financial Technologies LLC,
+ and Rabbit Technologies Ltd
+
+ Permission is hereby granted, free of charge, to any person
+ obtaining a copy of this file (the Software), to deal in the
+ Software without restriction, including without limitation the
+ rights to use, copy, modify, merge, publish, distribute,
+ sublicense, and/or sell copies of the Software, and to permit
+ persons to whom the Software is furnished to do so, subject to
+ the following conditions:
+
+ The above copyright notice and this permission notice shall be
+ included in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
+ EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ OTHER DEALINGS IN THE SOFTWARE."
The rest of this package is licensed under the Mozilla Public License 1.1
Authors and Copyright are as described below:
diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d
index ef66add5a5..a35a60ec68 100644
--- a/packaging/debs/Debian/debian/init.d
+++ b/packaging/debs/Debian/debian/init.d
@@ -26,7 +26,6 @@ fi
RETVAL=0
set -e
-cd /
start_rabbitmq () {
set +e
diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi
index 4cf0703a4c..1d0c785f6b 100755
--- a/scripts/rabbitmq-multi
+++ b/scripts/rabbitmq-multi
@@ -29,23 +29,23 @@
##
## Contributor(s): ______________________________________.
##
+NODENAME=rabbit
+NODE_IP_ADDRESS=0.0.0.0
+NODE_PORT=5672
+SCRIPT_HOME=$(dirname $0)
+PIDS_FILE=/var/lib/rabbitmq/pids
+MULTI_ERL_ARGS=
+MULTI_START_ARGS=
[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
-[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=rabbit
[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
-[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
-[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=5672
[ "x" = "x$RABBITMQ_SCRIPT_HOME" ] && RABBITMQ_SCRIPT_HOME=${SCRIPT_HOME}
-[ "x" = "x$RABBITMQ_SCRIPT_HOME" ] && RABBITMQ_SCRIPT_HOME=$(dirname $0)
[ "x" = "x$RABBITMQ_PIDS_FILE" ] && RABBITMQ_PIDS_FILE=${PIDS_FILE}
-[ "x" = "x$RABBITMQ_PIDS_FILE" ] && RABBITMQ_PIDS_FILE=/var/lib/rabbitmq/pids
[ "x" = "x$RABBITMQ_MULTI_ERL_ARGS" ] && RABBITMQ_MULTI_ERL_ARGS=${MULTI_ERL_ARGS}
-[ "x" = "x$RABBITMQ_MULTI_ERL_ARGS" ] && RABBITMQ_MULTI_ERL_ARGS=
[ "x" = "x$RABBITMQ_MULTI_START_ARGS" ] && RABBITMQ_MULTI_START_ARGS=${MULTI_START_ARGS}
-[ "x" = "x$RABBITMQ_MULTI_START_ARGS" ] && RABBITMQ_MULTI_START_ARGS=
export \
RABBITMQ_NODENAME \
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 6273804f56..d5be944a27 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -30,28 +30,30 @@
## Contributor(s): ______________________________________.
##
+NODENAME=rabbit
+NODE_IP_ADDRESS=0.0.0.0
+NODE_PORT=5672
+SERVER_ERL_ARGS="+K true +A30 \
+-kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \
+-kernel inet_default_connect_options [{nodelay,true}]"
+CLUSTER_CONFIG_FILE=/etc/rabbitmq/cluster.config
+LOG_BASE=/var/log/rabbitmq
+MNESIA_BASE=/var/lib/rabbitmq/mnesia
+SERVER_START_ARGS=
+
[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
-[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=rabbit
[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
-[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
-[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=5672
[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS}
-[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS="+K true +A30 \
--kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \
--kernel inet_default_connect_options [{nodelay,true}]"
[ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE}
-[ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config
[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE}
-[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=/var/log/rabbitmq
[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE}
-[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=/var/lib/rabbitmq/mnesia
+[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS}
+
[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR}
[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}
-[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS}
-[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=
## Log rotation
[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS}
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index b941b85005..c57978c050 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -30,10 +30,15 @@
## Contributor(s): ______________________________________.
##
+[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
+
+[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS}
+
exec erl \
-pa "`dirname $0`/../ebin" \
-noinput \
-hidden \
+ ${RABBITMQ_CTL_ERL_ARGS} \
-sname rabbitmqctl$$ \
-s rabbit_control \
-extra "$@"
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index 33a10777d0..e4dccfba64 100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -46,4 +46,4 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" (
exit /B
)
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden -sname rabbitmqctl -s rabbit_control -extra %*
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_CTL_ERL_ARGS% -sname rabbitmqctl -s rabbit_control -extra %*
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index da0ab9cf7a..54348d9a1c 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -192,7 +192,7 @@ delete_user(Username) ->
fun () ->
ok = mnesia:delete({rabbit_user, Username}),
[ok = mnesia:delete_object(
- rabbit_user_permissions, R, write) ||
+ rabbit_user_permission, R, write) ||
R <- mnesia:match_object(
rabbit_user_permission,
#user_permission{user_vhost = #user_vhost{
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 875624ba55..21999f16c3 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -78,7 +78,8 @@ stop() ->
register(Pid, HighMemMFA) ->
ok = gen_event:call(alarm_handler, ?MODULE,
- {register, Pid, HighMemMFA}).
+ {register, Pid, HighMemMFA},
+ infinity).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 59b4d039e7..06bb18f5d3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -124,19 +124,32 @@ recover() ->
recover_durable_queues() ->
Node = node(),
- %% TODO: use dirty ops instead
- R = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
+ lists:foreach(
+ fun (RecoveredQ) ->
+ Q = start_queue_process(RecoveredQ),
+ %% We need to catch the case where a client connected to
+ %% another node has deleted the queue (and possibly
+ %% re-created it).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () -> case mnesia:match_object(
+ rabbit_durable_queue, RecoveredQ, read) of
+ [_] -> ok = store_queue(Q),
+ true;
+ [] -> false
+ end
+ end) of
+ true -> ok;
+ false -> exit(Q#amqqueue.pid, shutdown)
+ end
+ end,
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
<- mnesia:table(rabbit_durable_queue),
- node(Pid) == Node]))
- end),
- Queues = lists:map(fun start_queue_process/1, R),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- lists:foreach(fun store_queue/1, Queues),
- ok
- end).
+ node(Pid) == Node]))
+ end)),
+ ok.
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
@@ -200,10 +213,10 @@ list(VHostPath) ->
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- gen_server2:pcall(QPid, 9, info).
+ gen_server2:pcall(QPid, 9, info, infinity).
info(#amqqueue{ pid = QPid }, Items) ->
- case gen_server2:pcall(QPid, 9, {info, Items}) of
+ case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -212,20 +225,20 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
-stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat).
+stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity).
stat_all() ->
lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- gen_server2:call(QPid, {delete, IfUnused, IfEmpty}).
+ gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity).
-purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge).
+purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity).
deliver(_IsMandatory, true, Txn, Message, QPid) ->
- gen_server2:call(QPid, {deliver_immediately, Txn, Message});
+ gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity);
deliver(true, _IsImmediate, Txn, Message, QPid) ->
- gen_server2:call(QPid, {deliver, Txn, Message}),
+ gen_server2:call(QPid, {deliver, Txn, Message}, infinity),
true;
deliver(false, _IsImmediate, Txn, Message, QPid) ->
gen_server2:cast(QPid, {deliver, Txn, Message}),
@@ -241,10 +254,9 @@ ack(QPid, Txn, MsgIds, ChPid) ->
gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}).
commit_all(QPids, Txn) ->
- Timeout = length(QPids) * ?CALL_TIMEOUT,
safe_pmap_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, Timeout) end,
+ fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end,
QPids).
rollback_all(QPids, Txn) ->
@@ -254,12 +266,11 @@ rollback_all(QPids, Txn) ->
QPids).
notify_down_all(QPids, ChPid) ->
- Timeout = length(QPids) * ?CALL_TIMEOUT,
safe_pmap_ok(
%% we don't care if the queue process has terminated in the
%% meantime
fun (_) -> ok end,
- fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, Timeout) end,
+ fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end,
QPids).
limit_all(QPids, ChPid, LimiterPid) ->
@@ -269,18 +280,20 @@ limit_all(QPids, ChPid, LimiterPid) ->
QPids).
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
- gen_server2:call(QPid, {claim_queue, ReaderPid}).
+ gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- gen_server2:call(QPid, {basic_get, ChPid, NoAck}).
+ gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity).
basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
- LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}).
+ LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
+ infinity).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+ ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
+ infinity).
notify_sent(QPid, ChPid) ->
gen_server2:cast(QPid, {notify_sent, ChPid}).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 19efd9fc22..a57e8076bf 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -103,24 +103,17 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
recover() ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- mnesia:foldl(
- fun (Exchange, Acc) ->
- ok = mnesia:write(rabbit_exchange, Exchange, write),
- Acc
- end, ok, rabbit_durable_exchange),
- mnesia:foldl(
- fun (Route, Acc) ->
- {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(rabbit_route,
- Route, write),
- ok = mnesia:write(rabbit_reverse_route,
- ReverseRoute, write),
- Acc
- end, ok, rabbit_durable_route),
- ok
- end).
+ ok = rabbit_misc:table_foreach(
+ fun(Exchange) -> ok = mnesia:write(rabbit_exchange,
+ Exchange, write)
+ end, rabbit_durable_exchange),
+ ok = rabbit_misc:table_foreach(
+ fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route),
+ ok = mnesia:write(rabbit_route,
+ Route, write),
+ ok = mnesia:write(rabbit_reverse_route,
+ ReverseRoute, write)
+ end, rabbit_durable_route).
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 51c1665bbb..2be005034e 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -82,7 +82,8 @@ guid() ->
%% and time. We combine that with a process-local counter to give
%% us a GUID that is monotonically increasing per process.
G = case get(guid) of
- undefined -> {{gen_server:call(?SERVER, serial), self()}, 0};
+ undefined -> {{gen_server:call(?SERVER, serial, infinity), self()},
+ 0};
{S, I} -> {S, I+1}
end,
put(guid, G),
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 532be26d8e..3f9b6ebb9b 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -90,7 +90,7 @@ can_send(undefined, _QPid) ->
can_send(LimiterPid, QPid) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
- fun () -> gen_server2:call(LimiterPid, {can_send, QPid}) end).
+ fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end).
%% Let the limiter know that the channel has received some acks from a
%% consumer
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 5d176f8fac..de7bc010b2 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -46,6 +46,7 @@
-export([ensure_ok/2]).
-export([localnode/1, tcp_name/3]).
-export([intersperse/2, upmap/2, map_in_order/2]).
+-export([table_foreach/2]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
@@ -98,6 +99,7 @@
-spec(intersperse/2 :: (A, [A]) -> [A]).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
+-spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok').
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
'ok' | 'aborted').
@@ -298,6 +300,21 @@ map_in_order(F, L) ->
lists:reverse(
lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)).
+%% For each entry in a table, execute a function in a transaction.
+%% This is often far more efficient than wrapping a tx around the lot.
+%%
+%% We ignore entries that have been modified or removed.
+table_foreach(F, TableName) ->
+ lists:foreach(
+ fun (E) -> execute_mnesia_transaction(
+ fun () -> case mnesia:match_object(TableName, E, read) of
+ [] -> ok;
+ _ -> F(E)
+ end
+ end)
+ end, dirty_read_all(TableName)),
+ ok.
+
dirty_read_all(TableName) ->
mnesia:dirty_select(TableName, [{'$1',[],['$1']}]).
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 94033a4f3d..f4fa45993a 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -49,6 +49,8 @@
-define(LOG_BUNDLE_DELAY, 5).
-define(COMPLETE_BUNDLE_DELAY, 2).
+-define(HIBERNATE_AFTER, 10000).
+
-define(MAX_WRAP_ENTRIES, 500).
-define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}).
@@ -93,7 +95,7 @@ start_link() ->
transaction(MessageList) ->
?LOGDEBUG("transaction ~p~n", [MessageList]),
TxnKey = rabbit_guid:guid(),
- gen_server:call(?SERVER, {transaction, TxnKey, MessageList}).
+ gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity).
extend_transaction(TxnKey, MessageList) ->
?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]),
@@ -105,17 +107,17 @@ dirty_work(MessageList) ->
commit_transaction(TxnKey) ->
?LOGDEBUG("commit_transaction ~p~n", [TxnKey]),
- gen_server:call(?SERVER, {commit_transaction, TxnKey}).
+ gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity).
rollback_transaction(TxnKey) ->
?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]),
gen_server:cast(?SERVER, {rollback_transaction, TxnKey}).
force_snapshot() ->
- gen_server:call(?SERVER, force_snapshot).
+ gen_server:call(?SERVER, force_snapshot, infinity).
serial() ->
- gen_server:call(?SERVER, serial).
+ gen_server:call(?SERVER, serial, infinity).
%%--------------------------------------------------------------------
@@ -164,10 +166,8 @@ handle_call({transaction, Key, MessageList}, From, State) ->
do_noreply(internal_commit(From, Key, NewState));
handle_call({commit_transaction, TxnKey}, From, State) ->
do_noreply(internal_commit(From, TxnKey, State));
-handle_call(force_snapshot, _From, State = #pstate{log_handle = LH,
- snapshot = Snapshot}) ->
- ok = take_snapshot(LH, Snapshot),
- do_reply(ok, State);
+handle_call(force_snapshot, _From, State) ->
+ do_reply(ok, flush(true, State));
handle_call(serial, _From,
State = #pstate{snapshot = #psnapshot{serial = Serial}}) ->
do_reply(Serial, State);
@@ -183,8 +183,13 @@ handle_cast({extend_transaction, TxnKey, MessageList}, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
+handle_info(timeout, State = #pstate{deadline = infinity}) ->
+ State1 = flush(true, State),
+ %% TODO: Once we drop support for R11B-5, we can change this to
+ %% {noreply, State1, hibernate};
+ proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]);
handle_info(timeout, State) ->
- {noreply, flush(State)};
+ do_noreply(flush(State));
handle_info(_Info, State) ->
{noreply, State}.
@@ -275,12 +280,13 @@ take_snapshot_and_save_old(LogHandle, Snapshot) ->
rabbit_log:info("Saving persister log in ~p~n", [OldFileName]),
ok = take_snapshot(LogHandle, OldFileName, Snapshot).
-maybe_take_snapshot(State = #pstate{entry_count = EntryCount, log_handle = LH,
- snapshot = Snapshot})
- when EntryCount >= ?MAX_WRAP_ENTRIES ->
+maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount,
+ log_handle = LH,
+ snapshot = Snapshot})
+ when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES ->
ok = take_snapshot(LH, Snapshot),
State#pstate{entry_count = 0};
-maybe_take_snapshot(State) ->
+maybe_take_snapshot(_Force, State) ->
State.
later_ms(DeltaMilliSec) ->
@@ -298,7 +304,7 @@ compute_deadline(_TimerDelay, ExistingDeadline) ->
ExistingDeadline.
compute_timeout(infinity) ->
- infinity;
+ ?HIBERNATE_AFTER;
compute_timeout(Deadline) ->
DeltaMilliSec = time_diff(Deadline, now()) * 1000.0,
if
@@ -314,18 +320,18 @@ do_noreply(State = #pstate{deadline = Deadline}) ->
do_reply(Reply, State = #pstate{deadline = Deadline}) ->
{reply, Reply, State, compute_timeout(Deadline)}.
-flush(State = #pstate{pending_logs = PendingLogs,
- pending_replies = Waiting,
- log_handle = LogHandle}) ->
- State1 = if
- PendingLogs /= [] ->
+flush(State) -> flush(false, State).
+
+flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs,
+ pending_replies = Waiting,
+ log_handle = LogHandle}) ->
+ State1 = if PendingLogs /= [] ->
disk_log:alog(LogHandle, lists:reverse(PendingLogs)),
- maybe_take_snapshot(
- State#pstate{
- entry_count = State#pstate.entry_count + 1});
- true ->
+ State#pstate{entry_count = State#pstate.entry_count + 1};
+ true ->
State
end,
+ State2 = maybe_take_snapshot(ForceSnapshot, State1),
if Waiting /= [] ->
ok = disk_log:sync(LogHandle),
lists:foreach(fun (From) -> gen_server:reply(From, ok) end,
@@ -333,7 +339,7 @@ flush(State = #pstate{pending_logs = PendingLogs,
true ->
ok
end,
- State1#pstate{deadline = infinity,
+ State2#pstate{deadline = infinity,
pending_logs = [],
pending_replies = []}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 9f642e3540..ef8038e7aa 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -161,10 +161,10 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
info(Pid) ->
- gen_server:call(Pid, info).
+ gen_server:call(Pid, info, infinity).
info(Pid, Items) ->
- case gen_server:call(Pid, {info, Items}) of
+ case gen_server:call(Pid, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index ff42ea0460..0b06a063a7 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -112,7 +112,8 @@ deliver_per_node(NodeQPids, Mandatory, Immediate,
fun ({Node, QPids}) ->
try gen_server2:call(
{?SERVER, Node},
- {deliver, QPids, Mandatory, Immediate, Txn, Message})
+ {deliver, QPids, Mandatory, Immediate, Txn, Message},
+ infinity)
catch
_Class:_Reason ->
%% TODO: figure out what to log (and do!) here