summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile10
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--packaging/RPMS/Fedora/Makefile6
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec5
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--packaging/debs/Debian/debian/control4
-rw-r--r--packaging/macports/net/rabbitmq-server/Portfile68
-rw-r--r--packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper13
-rw-r--r--packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper2
-rw-r--r--src/rabbit.erl4
-rw-r--r--src/rabbit_access_control.erl4
-rw-r--r--src/rabbit_amqqueue_process.erl271
-rw-r--r--src/rabbit_exchange.erl24
-rw-r--r--src/rabbit_limiter.erl18
-rw-r--r--src/rabbit_log.erl12
-rw-r--r--src/rabbit_reader.erl2
-rw-r--r--src/rabbit_tests.erl4
17 files changed, 244 insertions, 211 deletions
diff --git a/Makefile b/Makefile
index ef0fa4320c..c0e0c55a9f 100644
--- a/Makefile
+++ b/Makefile
@@ -1,7 +1,11 @@
+ifndef TMPDIR
+TMPDIR := /tmp
+endif
+
RABBITMQ_NODENAME=rabbit
RABBITMQ_SERVER_START_ARGS=
-RABBITMQ_MNESIA_DIR=/tmp/rabbitmq-$(RABBITMQ_NODENAME)-mnesia
-RABBITMQ_LOG_BASE=/tmp
+RABBITMQ_MNESIA_DIR=$(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia
+RABBITMQ_LOG_BASE=$(TMPDIR)
SOURCE_DIR=src
EBIN_DIR=ebin
@@ -69,7 +73,7 @@ clean: cleandb
rm -f docs/*.[0-9].gz
cleandb: stop-node
- erl -mnesia dir '"$(RABBITMQ_MNESIA_DIR)"' -noshell -eval 'lists:foreach(fun file:delete/1, filelib:wildcard(mnesia:system_info(directory) ++ "/*")), halt().'
+ rm -rf $(RABBITMQ_MNESIA_DIR)/*
############ various tasks to interact with RabbitMQ ###################
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 5be074920a..8e1c890eb2 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -11,6 +11,8 @@
rabbit_sup,
rabbit_tcp_client_sup]},
{applications, [kernel, stdlib, sasl, mnesia, os_mon]},
+%% we also depend on ssl but it shouldn't be in here as we don't
+%% actually want to start it
{mod, {rabbit, []}},
{env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
{extra_startup_steps, []},
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index 9fe91b98d2..c74d453361 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -13,12 +13,10 @@ endif
ifeq "x$(RPM_OS)" "xsuse"
REQUIRES=/sbin/chkconfig /sbin/service
-OS_DEFINES=--define '_initrddir /etc/init.d'
-RELEASE_OS=.suse
+OS_DEFINES=--define '_initrddir /etc/init.d' --define 'dist .suse'
else
REQUIRES=chkconfig initscripts
OS_DEFINES=--define '_initrddir /etc/rc.d/init.d'
-RELEASE_OS=
endif
rpms: clean server
@@ -27,7 +25,7 @@ prepare:
mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp
cp $(TOP_DIR)/$(TARBALL) SOURCES
cp rabbitmq-server.spec SPECS
- sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|;s|%%RELEASE_OS%%|$(RELEASE_OS)|' \
+ sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|' \
SPECS/rabbitmq-server.spec
cp init.d SOURCES/rabbitmq-server.init
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 3c3be609ce..9e7c4bfb69 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -2,7 +2,7 @@
Name: rabbitmq-server
Version: %%VERSION%%
-Release: 1%%RELEASE_OS%%
+Release: 1%{?dist}
License: MPLv1.1
Group: Development/Libraries
Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.gz
@@ -117,6 +117,9 @@ fi
rm -rf %{buildroot}
%changelog
+* Wed Jun 17 2009 Matthias Radestock <matthias@lshift.net> 1.6.0-1
+- New upstream release
+
* Tue May 19 2009 Matthias Radestock <matthias@lshift.net> 1.5.5-1
- Maintenance release for the 1.5.x series
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 7c5673f77b..ac94c8a318 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (1.6.0-1) hardy; urgency=low
+
+ * New Upstream Release
+
+ -- Matthias Radestock <matthias@lshift.net> Tue, 16 Jun 2009 15:02:58 +0100
+
rabbitmq-server (1.5.5-1) hardy; urgency=low
* New Upstream Release
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index 216360725d..d4e2cd1763 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -2,12 +2,12 @@ Source: rabbitmq-server
Section: net
Priority: extra
Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com>
-Build-Depends: cdbs, debhelper (>= 5), erlang-nox, erlang-dev, python-simplejson
+Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson
Standards-Version: 3.8.0
Package: rabbitmq-server
Architecture: all
-Depends: erlang-nox, erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends}
+Depends: erlang-base | erlang-base-hipe, erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends}
Description: An AMQP server written in Erlang
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/net/rabbitmq-server/Portfile
index 659132568f..b8096d206d 100644
--- a/packaging/macports/net/rabbitmq-server/Portfile
+++ b/packaging/macports/net/rabbitmq-server/Portfile
@@ -3,7 +3,7 @@
PortSystem 1.0
name rabbitmq-server
-version 1.5.5
+version 1.6.0
revision 0
categories net
maintainers tonyg@rabbitmq.com
@@ -19,9 +19,9 @@ homepage http://www.rabbitmq.com/
master_sites http://www.rabbitmq.com/releases/rabbitmq-server/v${version}/
checksums \
- md5 3242a67885c2471b5ab62254bf024679 \
- sha1 f4d6a01eaa2c74fa32f567fe410d21d9be1b43aa \
- rmd160 1a1c4b97d765548028c161d1617905151ca9e040
+ md5 af3b0d868d58e5aefb4f0837b82ca010 \
+ sha1 1834c670d076fa9878223aacaa35a5a6528f1d86 \
+ rmd160 d6c9de4e1fb48c6ceb1cb5d717ca2afb5e3266fe
depends_build port:erlang port:py25-simplejson
depends_run port:erlang
@@ -32,6 +32,8 @@ set serverhome ${prefix}/var/lib/rabbitmq
set logdir ${prefix}/var/log/rabbitmq
set mnesiadbdir ${prefix}/var/lib/rabbitmq/mnesia
set plistloc ${prefix}/etc/LaunchDaemons/org.macports.rabbitmq-server
+set sbindir ${destroot}${prefix}/lib/rabbitmq/bin
+set wrappersbin ${destroot}${prefix}/sbin
use_configure no
@@ -41,7 +43,7 @@ build.args PYTHON=${prefix}/bin/python2.5
destroot.destdir \
TARGET_DIR=${destroot}${prefix}/lib/erlang/lib/rabbitmq_server-${version} \
- SBIN_DIR=${destroot}${prefix}/sbin \
+ SBIN_DIR=${sbindir} \
MAN_DIR=${destroot}${prefix}/share/man
destroot.keepdirs \
@@ -59,32 +61,36 @@ post-destroot {
xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir}
reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \
- ${destroot}${prefix}/sbin/rabbitmq-multi \
- ${destroot}${prefix}/sbin/rabbitmq-server \
- ${destroot}${prefix}/sbin/rabbitmqctl
- reinplace -E "s:(RABBITMQ_CLUSTER_CONFIG_FILE)=/:\\1=${prefix}/:" \
- ${destroot}${prefix}/sbin/rabbitmq-multi \
- ${destroot}${prefix}/sbin/rabbitmq-server \
- ${destroot}${prefix}/sbin/rabbitmqctl
- reinplace -E "s:(RABBITMQ_LOG_BASE)=/:\\1=${prefix}/:" \
- ${destroot}${prefix}/sbin/rabbitmq-multi \
- ${destroot}${prefix}/sbin/rabbitmq-server \
- ${destroot}${prefix}/sbin/rabbitmqctl
- reinplace -E "s:(RABBITMQ_MNESIA_BASE)=/:\\1=${prefix}/:" \
- ${destroot}${prefix}/sbin/rabbitmq-multi \
- ${destroot}${prefix}/sbin/rabbitmq-server \
- ${destroot}${prefix}/sbin/rabbitmqctl
- reinplace -E "s:(RABBITMQ_PIDS_FILE)=/:\\1=${prefix}/:" \
- ${destroot}${prefix}/sbin/rabbitmq-multi \
- ${destroot}${prefix}/sbin/rabbitmq-server \
- ${destroot}${prefix}/sbin/rabbitmqctl
-
- file rename ${destroot}${prefix}/sbin/rabbitmqctl ${destroot}${prefix}/sbin/rabbitmqctl_real
- xinstall -m 555 ${filespath}/rabbitmqctl_wrapper ${destroot}${prefix}/sbin
- file rename ${destroot}${prefix}/sbin/rabbitmqctl_wrapper ${destroot}${prefix}/sbin/rabbitmqctl
-
- reinplace -E "s:@PREFIX@:${prefix}:" \
- ${destroot}${prefix}/sbin/rabbitmqctl
+ ${sbindir}/rabbitmq-multi \
+ ${sbindir}/rabbitmq-server \
+ ${sbindir}/rabbitmqctl
+ reinplace -E "s:(CLUSTER_CONFIG_FILE)=/:\\1=${prefix}/:" \
+ ${sbindir}/rabbitmq-multi \
+ ${sbindir}/rabbitmq-server \
+ ${sbindir}/rabbitmqctl
+ reinplace -E "s:(LOG_BASE)=/:\\1=${prefix}/:" \
+ ${sbindir}/rabbitmq-multi \
+ ${sbindir}/rabbitmq-server \
+ ${sbindir}/rabbitmqctl
+ reinplace -E "s:(MNESIA_BASE)=/:\\1=${prefix}/:" \
+ ${sbindir}/rabbitmq-multi \
+ ${sbindir}/rabbitmq-server \
+ ${sbindir}/rabbitmqctl
+ reinplace -E "s:(PIDS_FILE)=/:\\1=${prefix}/:" \
+ ${sbindir}/rabbitmq-multi \
+ ${sbindir}/rabbitmq-server \
+ ${sbindir}/rabbitmqctl
+
+ xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \
+ ${wrappersbin}/rabbitmq-multi
+
+ reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \
+ ${wrappersbin}/rabbitmq-multi
+ reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \
+ ${wrappersbin}/rabbitmq-multi
+ file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server
+ file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl
+
}
pre-install {
diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper
new file mode 100644
index 0000000000..0d7118c476
--- /dev/null
+++ b/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper
@@ -0,0 +1,13 @@
+#!/bin/bash
+cd /var/lib/rabbitmq
+
+SCRIPT=`basename $0`
+
+if [ `id -u` = 0 ] ; then
+ sudo -u rabbitmq -H /usr/lib/rabbitmq/bin/${SCRIPT} "$@"
+else
+ /usr/lib/rabbitmq/bin/${SCRIPT}
+ echo -e "\nOnly root should run ${SCRIPT}\n"
+ exit 1
+fi
+
diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper
deleted file mode 100644
index 1996811eb5..0000000000
--- a/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper
+++ /dev/null
@@ -1,2 +0,0 @@
-#!/bin/bash
-exec sudo -H -u rabbitmq "@PREFIX@/sbin/rabbitmqctl_real" "$@"
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 1ddb515173..196212eaee 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -136,7 +136,7 @@ start(normal, []) ->
ok = rabbit_amqqueue:start(),
- {ok, MemoryAlarms} = application:get_env(memory_alarms),
+ {ok, MemoryAlarms} = application:get_env(memory_alarms),
ok = rabbit_alarm:start(MemoryAlarms),
ok = rabbit_binary_generator:
@@ -304,7 +304,7 @@ rotate_logs(File, Suffix, OldHandler, NewHandler) ->
log_rotation_result({error, MainLogError}, {error, SaslLogError}) ->
{error, {{cannot_rotate_main_logs, MainLogError},
- {cannot_rotate_sasl_logs, SaslLogError}}};
+ {cannot_rotate_sasl_logs, SaslLogError}}};
log_rotation_result({error, MainLogError}, ok) ->
{error, {cannot_rotate_main_logs, MainLogError}};
log_rotation_result(ok, {error, SaslLogError}) ->
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 08bc5fc370..eda747b287 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -245,8 +245,8 @@ add_vhost(VHostPath) ->
[{<<"">>, direct},
{<<"amq.direct">>, direct},
{<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
{<<"amq.fanout">>, fanout}]],
ok;
[_] ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6027c9c04c..cf0ef44f5c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -53,14 +53,15 @@
has_had_consumers,
next_msg_id,
message_buffer,
- round_robin}).
+ active_consumers,
+ blocked_consumers}).
-record(consumer, {tag, ack_required}).
-record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}).
%% These are held in our process dictionary
--record(cr, {consumers,
+-record(cr, {consumer_count,
ch_pid,
limiter_pid,
monitor_ref,
@@ -99,7 +100,8 @@ init(Q) ->
has_had_consumers = false,
next_msg_id = 1,
message_buffer = queue:new(),
- round_robin = queue:new()}, ?HIBERNATE_AFTER}.
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -129,7 +131,7 @@ ch_record(ChPid) ->
case get(Key) of
undefined ->
MonitorRef = erlang:monitor(process, ChPid),
- C = #cr{consumers = [],
+ C = #cr{consumer_count = 0,
ch_pid = ChPid,
monitor_ref = MonitorRef,
unacked_messages = dict:new(),
@@ -148,7 +150,7 @@ all_ch_record() ->
[C || {{ch, _}, C} <- get()].
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
- Limited orelse Count > ?UNSENT_MESSAGE_LIMIT.
+ Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
ch_record_state_transition(OldCR, NewCR) ->
BlockedOld = is_ch_blocked(OldCR),
@@ -165,18 +167,18 @@ record_current_channel_tx(ChPid, Txn) ->
deliver_immediately(Message, Delivered,
State = #q{q = #amqqueue{name = QName},
- round_robin = RoundRobin,
+ active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers,
next_msg_id = NextId}) ->
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
- case queue:out(RoundRobin) of
+ case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
- RoundRobinTail} ->
+ ActiveConsumersTail} ->
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
- case not(AckRequired) orelse rabbit_limiter:can_send(
- LimiterPid, self()) of
+ case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of
true ->
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
@@ -188,18 +190,32 @@ deliver_immediately(Message, Delivered,
NewC = C#cr{unsent_message_count = Count + 1,
unacked_messages = NewUAM},
store_ch_record(NewC),
- NewConsumers =
+ {NewActiveConsumers, NewBlockedConsumers} =
case ch_record_state_transition(C, NewC) of
- ok -> queue:in(QEntry, RoundRobinTail);
- block -> block_consumers(ChPid, RoundRobinTail)
+ ok -> {queue:in(QEntry, ActiveConsumersTail),
+ BlockedConsumers};
+ block ->
+ {ActiveConsumers1, BlockedConsumers1} =
+ move_consumers(ChPid,
+ ActiveConsumersTail,
+ BlockedConsumers),
+ {ActiveConsumers1,
+ queue:in(QEntry, BlockedConsumers1)}
end,
- {offered, AckRequired, State#q{round_robin = NewConsumers,
- next_msg_id = NextId + 1}};
+ {offered, AckRequired,
+ State#q{active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedConsumers,
+ next_msg_id = NextId + 1}};
false ->
store_ch_record(C#cr{is_limit_active = true}),
- NewConsumers = block_consumers(ChPid, RoundRobinTail),
- deliver_immediately(Message, Delivered,
- State#q{round_robin = NewConsumers})
+ {NewActiveConsumers, NewBlockedConsumers} =
+ move_consumers(ChPid,
+ ActiveConsumers,
+ BlockedConsumers),
+ deliver_immediately(
+ Message, Delivered,
+ State#q{active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedConsumers})
end;
{empty, _} ->
{not_offered, State}
@@ -235,22 +251,24 @@ deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) ->
run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)),
State).
-block_consumers(ChPid, RoundRobin) ->
- %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ChPid, queue:to_list(RoundRobin)]),
- queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(RoundRobin))).
-
-unblock_consumers(ChPid, Consumers, RoundRobin) ->
- %%?LOGDEBUG("Unblocking ~p ~p ~p~n", [ChPid, Consumers, queue:to_list(RoundRobin)]),
- queue:join(RoundRobin,
- queue:from_list([{ChPid, Con} || Con <- Consumers])).
+add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
-block_consumer(ChPid, ConsumerTag, RoundRobin) ->
- %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ConsumerTag, queue:to_list(RoundRobin)]),
+remove_consumer(ChPid, ConsumerTag, Queue) ->
+ %% TODO: replace this with queue:filter/2 once we move to R12
queue:from_list(lists:filter(
fun ({CP, #consumer{tag = CT}}) ->
(CP /= ChPid) or (CT /= ConsumerTag)
- end, queue:to_list(RoundRobin))).
+ end, queue:to_list(Queue))).
+
+remove_consumers(ChPid, Queue) ->
+ %% TODO: replace this with queue:filter/2 once we move to R12
+ queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end,
+ queue:to_list(Queue))).
+
+move_consumers(ChPid, From, To) ->
+ {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
+ queue:to_list(From)),
+ {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}.
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
@@ -261,50 +279,25 @@ possibly_unblock(State, ChPid, Update) ->
store_ch_record(NewC),
case ch_record_state_transition(C, NewC) of
ok -> State;
- unblock -> NewRR = unblock_consumers(ChPid,
- NewC#cr.consumers,
- State#q.round_robin),
- run_poke_burst(State#q{round_robin = NewRR})
+ unblock -> {NewBlockedeConsumers, NewActiveConsumers} =
+ move_consumers(ChPid,
+ State#q.blocked_consumers,
+ State#q.active_consumers),
+ run_poke_burst(
+ State#q{active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedeConsumers})
end
end.
-check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) ->
- {continue, State};
-check_auto_delete(State = #q{has_had_consumers = false}) ->
- {continue, State};
-check_auto_delete(State = #q{round_robin = RoundRobin}) ->
- % The clauses above rule out cases where no-one has consumed from
- % this queue yet, and cases where we are not an auto_delete queue
- % in any case. Thus it remains to check whether we have any active
- % listeners at this point.
- case queue:is_empty(RoundRobin) of
- true ->
- % There are no waiting listeners. It's possible that we're
- % completely unused. Check.
- case is_unused() of
- true ->
- % There are no active consumers at this
- % point. This is the signal to autodelete.
- {stop, State};
- false ->
- % There is at least one active consumer, so we
- % shouldn't delete ourselves.
- {continue, State}
- end;
- false ->
- % There are some waiting listeners, thus we are not
- % unused, so can continue life as normal without needing
- % to check the process dictionary.
- {continue, State}
- end.
+should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
+should_auto_delete(#q{has_had_consumers = false}) -> false;
+should_auto_delete(State) -> is_unused(State).
-handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
- round_robin = ActiveConsumers}) ->
+handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(DownPid) of
not_found -> noreply(State);
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
unacked_messages = UAM} ->
- NewActive = block_consumers(ChPid, ActiveConsumers),
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
case Txn of
@@ -312,20 +305,22 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
_ -> ok = rollback_work(Txn, qname(State)),
erase_tx(Txn)
end,
- case check_auto_delete(
- deliver_or_enqueue_n(
- [{Message, true} ||
- {_Messsage_id, Message} <- dict:to_list(UAM)],
- State#q{
- exclusive_consumer = case Holder of
- {ChPid, _} -> none;
- Other -> Other
- end,
- round_robin = NewActive})) of
- {continue, NewState} ->
- noreply(NewState);
- {stop, NewState} ->
- {stop, normal, NewState}
+ NewState =
+ deliver_or_enqueue_n(
+ [{Message, true} ||
+ {_Messsage_id, Message} <- dict:to_list(UAM)],
+ State#q{
+ exclusive_consumer = case Holder of
+ {ChPid, _} -> none;
+ Other -> Other
+ end,
+ active_consumers = remove_consumers(
+ ChPid, State#q.active_consumers),
+ blocked_consumers = remove_consumers(
+ ChPid, State#q.blocked_consumers)}),
+ case should_auto_delete(NewState) of
+ false -> noreply(NewState);
+ true -> {stop, normal, NewState}
end
end.
@@ -338,12 +333,12 @@ check_queue_owner(none, _) -> ok;
check_queue_owner({ReaderPid, _}, ReaderPid) -> ok;
check_queue_owner({_, _}, _) -> mismatch.
-check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume) ->
+check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
-check_exclusive_access(none, false) ->
+check_exclusive_access(none, false, _State) ->
ok;
-check_exclusive_access(none, true) ->
- case is_unused() of
+check_exclusive_access(none, true, State) ->
+ case is_unused(State) of
true -> ok;
false -> in_use
end.
@@ -368,16 +363,8 @@ run_poke_burst(MessageBuffer, State) ->
State#q{message_buffer = MessageBuffer}
end.
-is_unused() ->
- is_unused1(get()).
-
-is_unused1([]) ->
- true;
-is_unused1([{{ch, _}, #cr{consumers = Consumers}} | _Rest])
- when Consumers /= [] ->
- false;
-is_unused1([_ | Rest]) ->
- is_unused1(Rest).
+is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso
+ queue:is_empty(State#q.blocked_consumers).
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
@@ -536,9 +523,8 @@ i(messages, State) ->
i(acks_uncommitted, _) ->
lists:sum([length(Pending) ||
#tx{pending_acks = Pending} <- all_tx_record()]);
-i(consumers, _) ->
- lists:sum([length(Consumers) ||
- #cr{consumers = Consumers} <- all_ch_record()]);
+i(consumers, State) ->
+ queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers);
i(transactions, _) ->
length(all_tx_record());
i(memory, _) ->
@@ -620,78 +606,91 @@ handle_call({basic_get, ChPid, NoAck}, _From,
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg},
_From, State = #q{owner = Owner,
- exclusive_consumer = ExistingHolder,
- round_robin = RoundRobin}) ->
+ exclusive_consumer = ExistingHolder}) ->
case check_queue_owner(Owner, ReaderPid) of
mismatch ->
reply({error, queue_owned_by_another_connection}, State);
ok ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume) of
+ case check_exclusive_access(ExistingHolder, ExclusiveConsume,
+ State) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
- C = #cr{consumers = Consumers} = ch_record(ChPid),
- Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
- store_ch_record(C#cr{consumers = [Consumer | Consumers],
+ C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
+ Consumer = #consumer{tag = ConsumerTag,
+ ack_required = not(NoAck)},
+ store_ch_record(C#cr{consumer_count = ConsumerCount +1,
limiter_pid = LimiterPid}),
- if Consumers == [] ->
+ if ConsumerCount == 0 ->
ok = rabbit_limiter:register(LimiterPid, self());
true ->
ok
end,
+ ExclusiveConsumer =
+ if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> ExistingHolder
+ end,
State1 = State#q{has_had_consumers = true,
- exclusive_consumer =
- if
- ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
- end,
- round_robin = queue:in({ChPid, Consumer}, RoundRobin)},
+ exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
- reply(ok, run_poke_burst(State1))
+ State2 =
+ case is_ch_blocked(C) of
+ true -> State1#q{
+ blocked_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.blocked_consumers)};
+ false -> run_poke_burst(
+ State1#q{
+ active_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.active_consumers)})
+ end,
+ reply(ok, State2)
end
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
- State = #q{exclusive_consumer = Holder,
- round_robin = RoundRobin}) ->
+ State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(ChPid) of
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumers = Consumers, limiter_pid = LimiterPid} ->
- NewConsumers = lists:filter
- (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
- Consumers),
- store_ch_record(C#cr{consumers = NewConsumers}),
- if NewConsumers == [] ->
+ C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} ->
+ store_ch_record(C#cr{consumer_count = ConsumerCount - 1}),
+ if ConsumerCount == 1 ->
ok = rabbit_limiter:unregister(LimiterPid, self());
true ->
ok
end,
ok = maybe_send_reply(ChPid, OkMsg),
- case check_auto_delete(
- State#q{exclusive_consumer = cancel_holder(ChPid,
- ConsumerTag,
- Holder),
- round_robin = block_consumer(ChPid,
- ConsumerTag,
- RoundRobin)}) of
- {continue, State1} ->
- reply(ok, State1);
- {stop, State1} ->
- {stop, normal, ok, State1}
+ NewState =
+ State#q{exclusive_consumer = cancel_holder(ChPid,
+ ConsumerTag,
+ Holder),
+ active_consumers = remove_consumer(
+ ChPid, ConsumerTag,
+ State#q.active_consumers),
+ blocked_consumers = remove_consumer(
+ ChPid, ConsumerTag,
+ State#q.blocked_consumers)},
+ case should_auto_delete(NewState) of
+ false -> reply(ok, NewState);
+ true -> {stop, normal, ok, NewState}
end
end;
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
message_buffer = MessageBuffer,
- round_robin = RoundRobin}) ->
- reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State);
+ active_consumers = ActiveConsumers}) ->
+ reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)},
+ State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{message_buffer = MessageBuffer}) ->
IsEmpty = queue:is_empty(MessageBuffer),
- IsUnused = is_unused(),
+ IsUnused = is_unused(State),
if
IfEmpty and not(IsEmpty) ->
reply({error, not_empty}, State);
@@ -710,7 +709,7 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
exclusive_consumer = Holder}) ->
case Owner of
none ->
- case check_exclusive_access(Holder, true) of
+ case check_exclusive_access(Holder, true, State) of
in_use ->
%% FIXME: Is this really the right answer? What if
%% an active consumer's reader is actually the
@@ -786,10 +785,10 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
noreply(
possibly_unblock(
State, ChPid,
- fun (C = #cr{consumers = Consumers,
+ fun (C = #cr{consumer_count = ConsumerCount,
limiter_pid = OldLimiterPid,
is_limit_active = Limited}) ->
- if Consumers =/= [] andalso OldLimiterPid == undefined ->
+ if ConsumerCount =/= 0 andalso OldLimiterPid == undefined ->
ok = rabbit_limiter:register(LimiterPid, self());
true ->
ok
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index e9161e8ad4..f17ee2f530 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -232,9 +232,9 @@ route(X = #exchange{type = topic}, RoutingKey, _Content) ->
route(X = #exchange{type = headers}, _RoutingKey, Content) ->
Headers = case (Content#content.properties)#'P_basic'.headers of
- undefined -> [];
- H -> sort_arguments(H)
- end,
+ undefined -> [];
+ H -> sort_arguments(H)
+ end,
match_bindings(X, fun (#binding{args = Spec}) ->
headers_match(Spec, Headers)
end);
@@ -467,14 +467,14 @@ parse_x_match(Other) ->
%%
headers_match(Pattern, Data) ->
MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
- {value, {_, longstr, MK}} -> parse_x_match(MK);
- {value, {_, Type, MK}} ->
- rabbit_log:warning("Invalid x-match field type ~p "
+ {value, {_, longstr, MK}} -> parse_x_match(MK);
+ {value, {_, Type, MK}} ->
+ rabbit_log:warning("Invalid x-match field type ~p "
"(value ~p); expected longstr",
- [Type, MK]),
- default_headers_match_kind();
- _ -> default_headers_match_kind()
- end,
+ [Type, MK]),
+ default_headers_match_kind();
+ _ -> default_headers_match_kind()
+ end,
headers_match(Pattern, Data, true, false, MatchKind).
headers_match([], _Data, AllMatch, _AnyMatch, all) ->
@@ -501,8 +501,8 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
%% the corresponding data field. I've interpreted that to
%% mean a type of "void" for the pattern field.
PT == void -> {AllMatch, true};
- %% Similarly, it's not specified, but I assume that a
- %% mismatched type causes a mismatched value.
+ %% Similarly, it's not specified, but I assume that a
+ %% mismatched type causes a mismatched value.
PT =/= DT -> {false, AnyMatch};
PV == DV -> {AllMatch, true};
true -> {false, AnyMatch}
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 3f9b6ebb9b..9f3dcbd071 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -36,7 +36,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-export([start_link/1, shutdown/1]).
--export([limit/2, can_send/2, ack/2, register/2, unregister/2]).
+-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
%%----------------------------------------------------------------------------
@@ -47,7 +47,7 @@
-spec(start_link/1 :: (pid()) -> pid()).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
--spec(can_send/2 :: (maybe_pid(), pid()) -> bool()).
+-spec(can_send/3 :: (maybe_pid(), pid(), bool()) -> bool()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
@@ -85,12 +85,13 @@ limit(LimiterPid, PrefetchCount) ->
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
-can_send(undefined, _QPid) ->
+can_send(undefined, _QPid, _AckRequired) ->
true;
-can_send(LimiterPid, QPid) ->
+can_send(LimiterPid, QPid, AckRequired) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
- fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end).
+ fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired},
+ infinity) end).
%% Let the limiter know that the channel has received some acks from a
%% consumer
@@ -110,10 +111,13 @@ unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid})
init([ChPid]) ->
{ok, #lim{ch_pid = ChPid} }.
-handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) ->
+handle_call({can_send, QPid, AckRequired}, _From,
+ State = #lim{volume = Volume}) ->
case limit_reached(State) of
true -> {reply, false, limit_queue(QPid, State)};
- false -> {reply, true, State#lim{volume = Volume + 1}}
+ false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
+ true -> Volume
+ end}}
end.
handle_cast(shutdown, State) ->
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index f408336e94..dd5b498b07 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -75,7 +75,7 @@ debug(Fmt, Args) when is_list(Args) ->
message(Direction, Channel, MethodRecord, Content) ->
gen_server:cast(?SERVER,
- {message, Direction, Channel, MethodRecord, Content}).
+ {message, Direction, Channel, MethodRecord, Content}).
info(Fmt) ->
gen_server:cast(?SERVER, {info, Fmt}).
@@ -112,11 +112,11 @@ handle_cast({debug, Fmt, Args}, State) ->
{noreply, State};
handle_cast({message, Direction, Channel, MethodRecord, Content}, State) ->
io:format("~s ch~p ~p~n",
- [case Direction of
- in -> "-->";
- out -> "<--" end,
- Channel,
- {MethodRecord, Content}]),
+ [case Direction of
+ in -> "-->";
+ out -> "<--" end,
+ Channel,
+ {MethodRecord, Content}]),
{noreply, State};
handle_cast({info, Fmt}, State) ->
error_logger:info_msg(Fmt),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f0d9033d07..6a7d68084d 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -231,7 +231,7 @@ start_connection(Parent, Deb, ClientSock) ->
connection_state = pre_init},
handshake, 8))
catch
- Ex -> (if Ex == connection_closed_abruptly ->
+ Ex -> (if Ex == connection_closed_abruptly ->
fun rabbit_log:warning/2;
true ->
fun rabbit_log:error/2
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 8f0a3a8973..01757509ec 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -261,7 +261,7 @@ test_log_management() ->
%% original log files are not writable
ok = make_files_non_writable([MainLog, SaslLog]),
{error, {{cannot_rotate_main_logs, _},
- {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []),
+ {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []),
%% logging directed to tty (handlers were removed in last test)
ok = clean_logs([MainLog, SaslLog], Suffix),
@@ -280,7 +280,7 @@ test_log_management() ->
ok = application:set_env(sasl, sasl_error_logger, {file, SaslLog}),
ok = application:set_env(kernel, error_logger, {file, MainLog}),
ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog},
- {rabbit_sasl_report_file_h, SaslLog}]),
+ {rabbit_sasl_report_file_h, SaslLog}]),
passed.
test_log_management_during_startup() ->