summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-10 17:08:25 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-10 17:08:25 +0100
commite8f690e23c4f350f6eb8dc9ccd8350a9439afc35 (patch)
tree3ae8a5a5c4a4653678c05b3f86ae59ca512e5801
parent6c013060af8af871693ed3377d0958bb5fb97990 (diff)
parent932be4e660f438eb35beaaf76c0388b62bd67da5 (diff)
downloadrabbitmq-server-git-e8f690e23c4f350f6eb8dc9ccd8350a9439afc35.tar.gz
just merging in default.
-rw-r--r--packaging/RPMS/Fedora/Makefile6
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec2
-rw-r--r--packaging/macports/net/rabbitmq-server/Portfile48
-rw-r--r--packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper23
-rw-r--r--packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper2
-rw-r--r--src/rabbit_amqqueue_process.erl35
-rw-r--r--src/rabbit_limiter.erl18
7 files changed, 84 insertions, 50 deletions
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index 9fe91b98d2..c74d453361 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -13,12 +13,10 @@ endif
ifeq "x$(RPM_OS)" "xsuse"
REQUIRES=/sbin/chkconfig /sbin/service
-OS_DEFINES=--define '_initrddir /etc/init.d'
-RELEASE_OS=.suse
+OS_DEFINES=--define '_initrddir /etc/init.d' --define 'dist .suse'
else
REQUIRES=chkconfig initscripts
OS_DEFINES=--define '_initrddir /etc/rc.d/init.d'
-RELEASE_OS=
endif
rpms: clean server
@@ -27,7 +25,7 @@ prepare:
mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp
cp $(TOP_DIR)/$(TARBALL) SOURCES
cp rabbitmq-server.spec SPECS
- sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|;s|%%RELEASE_OS%%|$(RELEASE_OS)|' \
+ sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|' \
SPECS/rabbitmq-server.spec
cp init.d SOURCES/rabbitmq-server.init
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 3c3be609ce..875381e807 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -2,7 +2,7 @@
Name: rabbitmq-server
Version: %%VERSION%%
-Release: 1%%RELEASE_OS%%
+Release: 1%{?dist}
License: MPLv1.1
Group: Development/Libraries
Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.gz
diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/net/rabbitmq-server/Portfile
index 659132568f..805bc3fd89 100644
--- a/packaging/macports/net/rabbitmq-server/Portfile
+++ b/packaging/macports/net/rabbitmq-server/Portfile
@@ -32,6 +32,8 @@ set serverhome ${prefix}/var/lib/rabbitmq
set logdir ${prefix}/var/log/rabbitmq
set mnesiadbdir ${prefix}/var/lib/rabbitmq/mnesia
set plistloc ${prefix}/etc/LaunchDaemons/org.macports.rabbitmq-server
+set sbindir ${destroot}${prefix}/lib/rabbitmq/bin
+set wrappersbin ${destroot}${prefix}/sbin
use_configure no
@@ -41,7 +43,7 @@ build.args PYTHON=${prefix}/bin/python2.5
destroot.destdir \
TARGET_DIR=${destroot}${prefix}/lib/erlang/lib/rabbitmq_server-${version} \
- SBIN_DIR=${destroot}${prefix}/sbin \
+ SBIN_DIR=${sbindir} \
MAN_DIR=${destroot}${prefix}/share/man
destroot.keepdirs \
@@ -59,32 +61,36 @@ post-destroot {
xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir}
reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \
- ${destroot}${prefix}/sbin/rabbitmq-multi \
- ${destroot}${prefix}/sbin/rabbitmq-server \
- ${destroot}${prefix}/sbin/rabbitmqctl
+ ${sbindir}/rabbitmq-multi \
+ ${sbindir}/rabbitmq-server \
+ ${sbindir}/rabbitmqctl
reinplace -E "s:(RABBITMQ_CLUSTER_CONFIG_FILE)=/:\\1=${prefix}/:" \
- ${destroot}${prefix}/sbin/rabbitmq-multi \
- ${destroot}${prefix}/sbin/rabbitmq-server \
- ${destroot}${prefix}/sbin/rabbitmqctl
+ ${sbindir}/rabbitmq-multi \
+ ${sbindir}/rabbitmq-server \
+ ${sbindir}/rabbitmqctl
reinplace -E "s:(RABBITMQ_LOG_BASE)=/:\\1=${prefix}/:" \
- ${destroot}${prefix}/sbin/rabbitmq-multi \
- ${destroot}${prefix}/sbin/rabbitmq-server \
- ${destroot}${prefix}/sbin/rabbitmqctl
+ ${sbindir}/rabbitmq-multi \
+ ${sbindir}/rabbitmq-server \
+ ${sbindir}/rabbitmqctl
reinplace -E "s:(RABBITMQ_MNESIA_BASE)=/:\\1=${prefix}/:" \
- ${destroot}${prefix}/sbin/rabbitmq-multi \
- ${destroot}${prefix}/sbin/rabbitmq-server \
- ${destroot}${prefix}/sbin/rabbitmqctl
+ ${sbindir}/rabbitmq-multi \
+ ${sbindir}/rabbitmq-server \
+ ${sbindir}/rabbitmqctl
reinplace -E "s:(RABBITMQ_PIDS_FILE)=/:\\1=${prefix}/:" \
- ${destroot}${prefix}/sbin/rabbitmq-multi \
- ${destroot}${prefix}/sbin/rabbitmq-server \
- ${destroot}${prefix}/sbin/rabbitmqctl
+ ${sbindir}/rabbitmq-multi \
+ ${sbindir}/rabbitmq-server \
+ ${sbindir}/rabbitmqctl
- file rename ${destroot}${prefix}/sbin/rabbitmqctl ${destroot}${prefix}/sbin/rabbitmqctl_real
- xinstall -m 555 ${filespath}/rabbitmqctl_wrapper ${destroot}${prefix}/sbin
- file rename ${destroot}${prefix}/sbin/rabbitmqctl_wrapper ${destroot}${prefix}/sbin/rabbitmqctl
+ xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \
+ ${wrappersbin}/rabbitmq-multi
+
+ reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \
+ ${wrappersbin}/rabbitmq-multi
+ reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \
+ ${wrappersbin}/rabbitmq-multi
+ file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server
+ file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl
- reinplace -E "s:@PREFIX@:${prefix}:" \
- ${destroot}${prefix}/sbin/rabbitmqctl
}
pre-install {
diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper
new file mode 100644
index 0000000000..296a77d19c
--- /dev/null
+++ b/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper
@@ -0,0 +1,23 @@
+#!/bin/bash
+# Escape spaces and quotes, because shell is revolting.
+for arg in "$@" ; do
+ # Escape quotes in parameters, so that they're passed through cleanly.
+ arg=$(sed -e 's/"/\\"/' <<-END
+ $arg
+ END
+ )
+ CMDLINE="${CMDLINE} \"${arg}\""
+done
+
+cd /var/lib/rabbitmq
+
+SCRIPT=`basename $0`
+
+if [ `id -u` = 0 ] ; then
+ su rabbitmq -s /bin/sh -c "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}"
+else
+ /usr/lib/rabbitmq/bin/${SCRIPT}
+ echo -e "\nOnly root should run ${SCRIPT}\n"
+ exit 1
+fi
+
diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper
deleted file mode 100644
index 1996811eb5..0000000000
--- a/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper
+++ /dev/null
@@ -1,2 +0,0 @@
-#!/bin/bash
-exec sudo -H -u rabbitmq "@PREFIX@/sbin/rabbitmqctl_real" "$@"
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 81dea02789..f45f931e81 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -151,7 +151,7 @@ all_ch_record() ->
[C || {{ch, _}, C} <- get()].
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
- Limited orelse Count > ?UNSENT_MESSAGE_LIMIT.
+ Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
ch_record_state_transition(OldCR, NewCR) ->
BlockedOld = is_ch_blocked(OldCR),
@@ -178,12 +178,8 @@ deliver_queue(Fun, FunAcc0,
unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
IsMsgReady = Fun(is_message_ready, FunAcc0, State),
- case IsMsgReady
- andalso
- ( (not AckRequired)
- orelse
- rabbit_limiter:can_send( LimiterPid, self() )
- ) of
+ case (IsMsgReady andalso
+ rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of
true ->
case Fun(AckRequired, FunAcc0, State) of
{empty, FunAcc1, State2} ->
@@ -611,7 +607,8 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
reply({error, exclusive_consume_unavailable}, State);
ok ->
C = #cr{consumers = Consumers} = ch_record(ChPid),
- Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
+ Consumer = #consumer{tag = ConsumerTag,
+ ack_required = not(NoAck)},
store_ch_record(C#cr{consumers = [Consumer | Consumers],
limiter_pid = LimiterPid}),
if Consumers == [] ->
@@ -619,15 +616,23 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
true ->
ok
end,
+ ExclusiveConsumer =
+ if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> ExistingHolder
+ end,
State1 = State#q{has_had_consumers = true,
- exclusive_consumer =
- if
- ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
- end,
- round_robin = queue:in({ChPid, Consumer}, RoundRobin)},
+ exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
- reply(ok, run_message_queue(State1))
+ State2 =
+ case is_ch_blocked(C) of
+ true -> State1;
+ false -> run_message_queue(
+ State1 #q {
+ round_robin = queue:in(
+ {ChPid, Consumer},
+ RoundRobin)})
+ end,
+ reply(ok, State2)
end
end;
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 3f9b6ebb9b..9f3dcbd071 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -36,7 +36,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-export([start_link/1, shutdown/1]).
--export([limit/2, can_send/2, ack/2, register/2, unregister/2]).
+-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
%%----------------------------------------------------------------------------
@@ -47,7 +47,7 @@
-spec(start_link/1 :: (pid()) -> pid()).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
--spec(can_send/2 :: (maybe_pid(), pid()) -> bool()).
+-spec(can_send/3 :: (maybe_pid(), pid(), bool()) -> bool()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
@@ -85,12 +85,13 @@ limit(LimiterPid, PrefetchCount) ->
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
-can_send(undefined, _QPid) ->
+can_send(undefined, _QPid, _AckRequired) ->
true;
-can_send(LimiterPid, QPid) ->
+can_send(LimiterPid, QPid, AckRequired) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
- fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end).
+ fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired},
+ infinity) end).
%% Let the limiter know that the channel has received some acks from a
%% consumer
@@ -110,10 +111,13 @@ unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid})
init([ChPid]) ->
{ok, #lim{ch_pid = ChPid} }.
-handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) ->
+handle_call({can_send, QPid, AckRequired}, _From,
+ State = #lim{volume = Volume}) ->
case limit_reached(State) of
true -> {reply, false, limit_queue(QPid, State)};
- false -> {reply, true, State#lim{volume = Volume + 1}}
+ false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
+ true -> Volume
+ end}}
end.
handle_cast(shutdown, State) ->