diff options
| -rw-r--r-- | codegen.py | 36 | ||||
| -rwxr-xr-x | scripts/rabbitmq-multi | 10 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server | 10 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 42 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 9 |
6 files changed, 78 insertions, 31 deletions
diff --git a/codegen.py b/codegen.py index 20bfc94796..6f39574f26 100644 --- a/codegen.py +++ b/codegen.py @@ -92,6 +92,40 @@ class PackedMethodBitField: def full(self): return self.count() == 8 + +def printFileHeader(): + print """%% Autogenerated code. Do not edit. +%% +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%%""" def genErl(spec): def erlType(domain): @@ -251,6 +285,7 @@ def genErl(spec): methods = spec.allMethods() + printFileHeader() print """-module(rabbit_framing). -include("rabbit_framing.hrl"). @@ -325,6 +360,7 @@ def genHrl(spec): methods = spec.allMethods() + printFileHeader() print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major) print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor) print "-define(PROTOCOL_PORT, %d)." % (spec.port) diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 1a7eb97e08..a6eb102a4b 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -30,8 +30,6 @@ ## Contributor(s): ______________________________________. ## NODENAME=rabbit -NODE_IP_ADDRESS=0.0.0.0 -NODE_PORT=5672 SCRIPT_HOME=$(dirname $0) PIDS_FILE=/var/lib/rabbitmq/pids MULTI_ERL_ARGS= @@ -40,14 +38,18 @@ CONFIG_FILE=/etc/rabbitmq/rabbitmq . `dirname $0`/rabbitmq-env +DEFAULT_NODE_IP_ADDRESS=0.0.0.0 +DEFAULT_NODE_PORT=5672 +[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} +[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] then if [ "x" != "x$RABBITMQ_NODE_PORT" ] - then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} + then RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS} fi else if [ "x" = "x$RABBITMQ_NODE_PORT" ] - then RABBITMQ_NODE_PORT=${NODE_PORT} + then RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT} fi fi [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 7f08cd9d75..cbc295f7d9 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -31,8 +31,6 @@ ## NODENAME=rabbit -NODE_IP_ADDRESS=0.0.0.0 -NODE_PORT=5672 SERVER_ERL_ARGS="+K true +A30 \ -kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \ -kernel inet_default_connect_options [{nodelay,true}]" @@ -44,14 +42,18 @@ SERVER_START_ARGS= . `dirname $0`/rabbitmq-env +DEFAULT_NODE_IP_ADDRESS=0.0.0.0 +DEFAULT_NODE_PORT=5672 +[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} +[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] then if [ "x" != "x$RABBITMQ_NODE_PORT" ] - then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} + then RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS} fi else if [ "x" = "x$RABBITMQ_NODE_PORT" ] - then RABBITMQ_NODE_PORT=${NODE_PORT} + then RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT} fi fi [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d728ef6a76..515dbf6823 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -266,7 +266,7 @@ requeue(QPid, MsgIds, ChPid) -> gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). + gen_server2:pcast(QPid, 8, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> safe_pmap_ok( diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c20cb16ca1..7e195d2fcb 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -526,24 +526,24 @@ handle_method(#'basic.recover'{requeue = false}, _, State = #ch{ transaction_id = none, writer_pid = WriterPid, unacked_message_q = UAMQ }) -> - lists:foreach( - fun ({_DeliveryTag, none, _Msg}) -> - %% Was sent as a basic.get_ok. Don't redeliver - %% it. FIXME: appropriate? - ok; - ({DeliveryTag, ConsumerTag, - {QName, QPid, MsgId, _Redelivered, Message}}) -> - %% Was sent as a proper consumer delivery. Resend it as - %% before. - %% - %% FIXME: What should happen if the consumer's been - %% cancelled since? - %% - %% FIXME: should we allocate a fresh DeliveryTag? - ok = internal_deliver( + ok = rabbit_misc:queue_fold( + fun ({_DeliveryTag, none, _Msg}, ok) -> + %% Was sent as a basic.get_ok. Don't redeliver + %% it. FIXME: appropriate? + ok; + ({DeliveryTag, ConsumerTag, + {QName, QPid, MsgId, _Redelivered, Message}}, ok) -> + %% Was sent as a proper consumer delivery. Resend + %% it as before. + %% + %% FIXME: What should happen if the consumer's been + %% cancelled since? + %% + %% FIXME: should we allocate a fresh DeliveryTag? + internal_deliver( WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) - end, queue:to_list(UAMQ)), + end, ok, UAMQ), %% No answer required, apparently! {noreply, State}; @@ -872,7 +872,7 @@ rollback_and_notify(State) -> notify_queues(internal_rollback(State)). fold_per_queue(F, Acc0, UAQ) -> - D = lists:foldl( + D = rabbit_misc:queue_fold( fun ({_DTag, _CTag, {_QName, QPid, MsgId, _Redelivered, _Message}}, D) -> %% dict:append would be simpler and avoid the @@ -883,7 +883,7 @@ fold_per_queue(F, Acc0, UAQ) -> fun (MsgIds) -> [MsgId | MsgIds] end, [MsgId], D) - end, dict:new(), queue:to_list(UAQ)), + end, dict:new(), UAQ), dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). @@ -912,9 +912,9 @@ consumer_queues(Consumers) -> notify_limiter(undefined, _Acked) -> ok; notify_limiter(LimiterPid, Acked) -> - case lists:foldl(fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 - end, 0, queue:to_list(Acked)) of + case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, Acked) of 0 -> ok; Count -> rabbit_limiter:ack(LimiterPid, Count) end. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index d927bfb1f9..0866da3fb0 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -55,7 +55,7 @@ -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). --export([unfold/2, ceil/1]). +-export([unfold/2, ceil/1, queue_fold/3]). -import(mnesia). -import(lists). @@ -126,6 +126,7 @@ -spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). -spec(ceil/1 :: (number()) -> number()). +-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). -endif. @@ -492,3 +493,9 @@ ceil(N) -> 0 -> N; _ -> 1 + T end. + +queue_fold(Fun, Init, Q) -> + case queue:out(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) + end. |
