diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-10 17:08:25 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-10 17:08:25 +0100 |
| commit | e8f690e23c4f350f6eb8dc9ccd8350a9439afc35 (patch) | |
| tree | 3ae8a5a5c4a4653678c05b3f86ae59ca512e5801 | |
| parent | 6c013060af8af871693ed3377d0958bb5fb97990 (diff) | |
| parent | 932be4e660f438eb35beaaf76c0388b62bd67da5 (diff) | |
| download | rabbitmq-server-git-e8f690e23c4f350f6eb8dc9ccd8350a9439afc35.tar.gz | |
just merging in default.
| -rw-r--r-- | packaging/RPMS/Fedora/Makefile | 6 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 2 | ||||
| -rw-r--r-- | packaging/macports/net/rabbitmq-server/Portfile | 48 | ||||
| -rw-r--r-- | packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper | 23 | ||||
| -rw-r--r-- | packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 35 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 18 |
7 files changed, 84 insertions, 50 deletions
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index 9fe91b98d2..c74d453361 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -13,12 +13,10 @@ endif ifeq "x$(RPM_OS)" "xsuse" REQUIRES=/sbin/chkconfig /sbin/service -OS_DEFINES=--define '_initrddir /etc/init.d' -RELEASE_OS=.suse +OS_DEFINES=--define '_initrddir /etc/init.d' --define 'dist .suse' else REQUIRES=chkconfig initscripts OS_DEFINES=--define '_initrddir /etc/rc.d/init.d' -RELEASE_OS= endif rpms: clean server @@ -27,7 +25,7 @@ prepare: mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp cp $(TOP_DIR)/$(TARBALL) SOURCES cp rabbitmq-server.spec SPECS - sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|;s|%%RELEASE_OS%%|$(RELEASE_OS)|' \ + sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|' \ SPECS/rabbitmq-server.spec cp init.d SOURCES/rabbitmq-server.init diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 3c3be609ce..875381e807 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -2,7 +2,7 @@ Name: rabbitmq-server Version: %%VERSION%% -Release: 1%%RELEASE_OS%% +Release: 1%{?dist} License: MPLv1.1 Group: Development/Libraries Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.gz diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/net/rabbitmq-server/Portfile index 659132568f..805bc3fd89 100644 --- a/packaging/macports/net/rabbitmq-server/Portfile +++ b/packaging/macports/net/rabbitmq-server/Portfile @@ -32,6 +32,8 @@ set serverhome ${prefix}/var/lib/rabbitmq set logdir ${prefix}/var/log/rabbitmq set mnesiadbdir ${prefix}/var/lib/rabbitmq/mnesia set plistloc ${prefix}/etc/LaunchDaemons/org.macports.rabbitmq-server +set sbindir ${destroot}${prefix}/lib/rabbitmq/bin +set wrappersbin ${destroot}${prefix}/sbin use_configure no @@ -41,7 +43,7 @@ build.args PYTHON=${prefix}/bin/python2.5 destroot.destdir \ TARGET_DIR=${destroot}${prefix}/lib/erlang/lib/rabbitmq_server-${version} \ - SBIN_DIR=${destroot}${prefix}/sbin \ + SBIN_DIR=${sbindir} \ MAN_DIR=${destroot}${prefix}/share/man destroot.keepdirs \ @@ -59,32 +61,36 @@ post-destroot { xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir} reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \ - ${destroot}${prefix}/sbin/rabbitmq-multi \ - ${destroot}${prefix}/sbin/rabbitmq-server \ - ${destroot}${prefix}/sbin/rabbitmqctl + ${sbindir}/rabbitmq-multi \ + ${sbindir}/rabbitmq-server \ + ${sbindir}/rabbitmqctl reinplace -E "s:(RABBITMQ_CLUSTER_CONFIG_FILE)=/:\\1=${prefix}/:" \ - ${destroot}${prefix}/sbin/rabbitmq-multi \ - ${destroot}${prefix}/sbin/rabbitmq-server \ - ${destroot}${prefix}/sbin/rabbitmqctl + ${sbindir}/rabbitmq-multi \ + ${sbindir}/rabbitmq-server \ + ${sbindir}/rabbitmqctl reinplace -E "s:(RABBITMQ_LOG_BASE)=/:\\1=${prefix}/:" \ - ${destroot}${prefix}/sbin/rabbitmq-multi \ - ${destroot}${prefix}/sbin/rabbitmq-server \ - ${destroot}${prefix}/sbin/rabbitmqctl + ${sbindir}/rabbitmq-multi \ + ${sbindir}/rabbitmq-server \ + ${sbindir}/rabbitmqctl reinplace -E "s:(RABBITMQ_MNESIA_BASE)=/:\\1=${prefix}/:" \ - ${destroot}${prefix}/sbin/rabbitmq-multi \ - ${destroot}${prefix}/sbin/rabbitmq-server \ - ${destroot}${prefix}/sbin/rabbitmqctl + ${sbindir}/rabbitmq-multi \ + ${sbindir}/rabbitmq-server \ + ${sbindir}/rabbitmqctl reinplace -E "s:(RABBITMQ_PIDS_FILE)=/:\\1=${prefix}/:" \ - ${destroot}${prefix}/sbin/rabbitmq-multi \ - ${destroot}${prefix}/sbin/rabbitmq-server \ - ${destroot}${prefix}/sbin/rabbitmqctl + ${sbindir}/rabbitmq-multi \ + ${sbindir}/rabbitmq-server \ + ${sbindir}/rabbitmqctl - file rename ${destroot}${prefix}/sbin/rabbitmqctl ${destroot}${prefix}/sbin/rabbitmqctl_real - xinstall -m 555 ${filespath}/rabbitmqctl_wrapper ${destroot}${prefix}/sbin - file rename ${destroot}${prefix}/sbin/rabbitmqctl_wrapper ${destroot}${prefix}/sbin/rabbitmqctl + xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \ + ${wrappersbin}/rabbitmq-multi + + reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \ + ${wrappersbin}/rabbitmq-multi + reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \ + ${wrappersbin}/rabbitmq-multi + file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server + file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl - reinplace -E "s:@PREFIX@:${prefix}:" \ - ${destroot}${prefix}/sbin/rabbitmqctl } pre-install { diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper new file mode 100644 index 0000000000..296a77d19c --- /dev/null +++ b/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper @@ -0,0 +1,23 @@ +#!/bin/bash +# Escape spaces and quotes, because shell is revolting. +for arg in "$@" ; do + # Escape quotes in parameters, so that they're passed through cleanly. + arg=$(sed -e 's/"/\\"/' <<-END + $arg + END + ) + CMDLINE="${CMDLINE} \"${arg}\"" +done + +cd /var/lib/rabbitmq + +SCRIPT=`basename $0` + +if [ `id -u` = 0 ] ; then + su rabbitmq -s /bin/sh -c "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}" +else + /usr/lib/rabbitmq/bin/${SCRIPT} + echo -e "\nOnly root should run ${SCRIPT}\n" + exit 1 +fi + diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper deleted file mode 100644 index 1996811eb5..0000000000 --- a/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/bash -exec sudo -H -u rabbitmq "@PREFIX@/sbin/rabbitmqctl_real" "$@" diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 81dea02789..f45f931e81 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -151,7 +151,7 @@ all_ch_record() -> [C || {{ch, _}, C} <- get()]. is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> - Limited orelse Count > ?UNSENT_MESSAGE_LIMIT. + Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. ch_record_state_transition(OldCR, NewCR) -> BlockedOld = is_ch_blocked(OldCR), @@ -178,12 +178,8 @@ deliver_queue(Fun, FunAcc0, unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), IsMsgReady = Fun(is_message_ready, FunAcc0, State), - case IsMsgReady - andalso - ( (not AckRequired) - orelse - rabbit_limiter:can_send( LimiterPid, self() ) - ) of + case (IsMsgReady andalso + rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of true -> case Fun(AckRequired, FunAcc0, State) of {empty, FunAcc1, State2} -> @@ -611,7 +607,8 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, reply({error, exclusive_consume_unavailable}, State); ok -> C = #cr{consumers = Consumers} = ch_record(ChPid), - Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, + Consumer = #consumer{tag = ConsumerTag, + ack_required = not(NoAck)}, store_ch_record(C#cr{consumers = [Consumer | Consumers], limiter_pid = LimiterPid}), if Consumers == [] -> @@ -619,15 +616,23 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, true -> ok end, + ExclusiveConsumer = + if ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> ExistingHolder + end, State1 = State#q{has_had_consumers = true, - exclusive_consumer = - if - ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> ExistingHolder - end, - round_robin = queue:in({ChPid, Consumer}, RoundRobin)}, + exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), - reply(ok, run_message_queue(State1)) + State2 = + case is_ch_blocked(C) of + true -> State1; + false -> run_message_queue( + State1 #q { + round_robin = queue:in( + {ChPid, Consumer}, + RoundRobin)}) + end, + reply(ok, State2) end end; diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 3f9b6ebb9b..9f3dcbd071 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -36,7 +36,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1, shutdown/1]). --export([limit/2, can_send/2, ack/2, register/2, unregister/2]). +-export([limit/2, can_send/3, ack/2, register/2, unregister/2]). %%---------------------------------------------------------------------------- @@ -47,7 +47,7 @@ -spec(start_link/1 :: (pid()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). --spec(can_send/2 :: (maybe_pid(), pid()) -> bool()). +-spec(can_send/3 :: (maybe_pid(), pid(), bool()) -> bool()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). @@ -85,12 +85,13 @@ limit(LimiterPid, PrefetchCount) -> %% Ask the limiter whether the queue can deliver a message without %% breaching a limit -can_send(undefined, _QPid) -> +can_send(undefined, _QPid, _AckRequired) -> true; -can_send(LimiterPid, QPid) -> +can_send(LimiterPid, QPid, AckRequired) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end). + fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired}, + infinity) end). %% Let the limiter know that the channel has received some acks from a %% consumer @@ -110,10 +111,13 @@ unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}) init([ChPid]) -> {ok, #lim{ch_pid = ChPid} }. -handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) -> +handle_call({can_send, QPid, AckRequired}, _From, + State = #lim{volume = Volume}) -> case limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, State#lim{volume = Volume + 1}} + false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end}} end. handle_cast(shutdown, State) -> |
