summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--codegen.py36
-rwxr-xr-xscripts/rabbitmq-multi10
-rwxr-xr-xscripts/rabbitmq-server10
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_channel.erl42
-rw-r--r--src/rabbit_misc.erl9
6 files changed, 78 insertions, 31 deletions
diff --git a/codegen.py b/codegen.py
index 20bfc94796..6f39574f26 100644
--- a/codegen.py
+++ b/codegen.py
@@ -92,6 +92,40 @@ class PackedMethodBitField:
def full(self):
return self.count() == 8
+
+def printFileHeader():
+ print """%% Autogenerated code. Do not edit.
+%%
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%"""
def genErl(spec):
def erlType(domain):
@@ -251,6 +285,7 @@ def genErl(spec):
methods = spec.allMethods()
+ printFileHeader()
print """-module(rabbit_framing).
-include("rabbit_framing.hrl").
@@ -325,6 +360,7 @@ def genHrl(spec):
methods = spec.allMethods()
+ printFileHeader()
print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major)
print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor)
print "-define(PROTOCOL_PORT, %d)." % (spec.port)
diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi
index 1a7eb97e08..a6eb102a4b 100755
--- a/scripts/rabbitmq-multi
+++ b/scripts/rabbitmq-multi
@@ -30,8 +30,6 @@
## Contributor(s): ______________________________________.
##
NODENAME=rabbit
-NODE_IP_ADDRESS=0.0.0.0
-NODE_PORT=5672
SCRIPT_HOME=$(dirname $0)
PIDS_FILE=/var/lib/rabbitmq/pids
MULTI_ERL_ARGS=
@@ -40,14 +38,18 @@ CONFIG_FILE=/etc/rabbitmq/rabbitmq
. `dirname $0`/rabbitmq-env
+DEFAULT_NODE_IP_ADDRESS=0.0.0.0
+DEFAULT_NODE_PORT=5672
+[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
+[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ]
then
if [ "x" != "x$RABBITMQ_NODE_PORT" ]
- then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
+ then RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS}
fi
else
if [ "x" = "x$RABBITMQ_NODE_PORT" ]
- then RABBITMQ_NODE_PORT=${NODE_PORT}
+ then RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT}
fi
fi
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 7f08cd9d75..cbc295f7d9 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -31,8 +31,6 @@
##
NODENAME=rabbit
-NODE_IP_ADDRESS=0.0.0.0
-NODE_PORT=5672
SERVER_ERL_ARGS="+K true +A30 \
-kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \
-kernel inet_default_connect_options [{nodelay,true}]"
@@ -44,14 +42,18 @@ SERVER_START_ARGS=
. `dirname $0`/rabbitmq-env
+DEFAULT_NODE_IP_ADDRESS=0.0.0.0
+DEFAULT_NODE_PORT=5672
+[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
+[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ]
then
if [ "x" != "x$RABBITMQ_NODE_PORT" ]
- then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
+ then RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS}
fi
else
if [ "x" = "x$RABBITMQ_NODE_PORT" ]
- then RABBITMQ_NODE_PORT=${NODE_PORT}
+ then RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT}
fi
fi
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d728ef6a76..515dbf6823 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -266,7 +266,7 @@ requeue(QPid, MsgIds, ChPid) ->
gen_server2:cast(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
- gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}).
+ gen_server2:pcast(QPid, 8, {ack, Txn, MsgIds, ChPid}).
commit_all(QPids, Txn) ->
safe_pmap_ok(
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c20cb16ca1..7e195d2fcb 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -526,24 +526,24 @@ handle_method(#'basic.recover'{requeue = false},
_, State = #ch{ transaction_id = none,
writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
- lists:foreach(
- fun ({_DeliveryTag, none, _Msg}) ->
- %% Was sent as a basic.get_ok. Don't redeliver
- %% it. FIXME: appropriate?
- ok;
- ({DeliveryTag, ConsumerTag,
- {QName, QPid, MsgId, _Redelivered, Message}}) ->
- %% Was sent as a proper consumer delivery. Resend it as
- %% before.
- %%
- %% FIXME: What should happen if the consumer's been
- %% cancelled since?
- %%
- %% FIXME: should we allocate a fresh DeliveryTag?
- ok = internal_deliver(
+ ok = rabbit_misc:queue_fold(
+ fun ({_DeliveryTag, none, _Msg}, ok) ->
+ %% Was sent as a basic.get_ok. Don't redeliver
+ %% it. FIXME: appropriate?
+ ok;
+ ({DeliveryTag, ConsumerTag,
+ {QName, QPid, MsgId, _Redelivered, Message}}, ok) ->
+ %% Was sent as a proper consumer delivery. Resend
+ %% it as before.
+ %%
+ %% FIXME: What should happen if the consumer's been
+ %% cancelled since?
+ %%
+ %% FIXME: should we allocate a fresh DeliveryTag?
+ internal_deliver(
WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
- end, queue:to_list(UAMQ)),
+ end, ok, UAMQ),
%% No answer required, apparently!
{noreply, State};
@@ -872,7 +872,7 @@ rollback_and_notify(State) ->
notify_queues(internal_rollback(State)).
fold_per_queue(F, Acc0, UAQ) ->
- D = lists:foldl(
+ D = rabbit_misc:queue_fold(
fun ({_DTag, _CTag,
{_QName, QPid, MsgId, _Redelivered, _Message}}, D) ->
%% dict:append would be simpler and avoid the
@@ -883,7 +883,7 @@ fold_per_queue(F, Acc0, UAQ) ->
fun (MsgIds) -> [MsgId | MsgIds] end,
[MsgId],
D)
- end, dict:new(), queue:to_list(UAQ)),
+ end, dict:new(), UAQ),
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
@@ -912,9 +912,9 @@ consumer_queues(Consumers) ->
notify_limiter(undefined, _Acked) ->
ok;
notify_limiter(LimiterPid, Acked) ->
- case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
- end, 0, queue:to_list(Acked)) of
+ case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, Acked) of
0 -> ok;
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index d927bfb1f9..0866da3fb0 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -55,7 +55,7 @@
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
--export([unfold/2, ceil/1]).
+-export([unfold/2, ceil/1, queue_fold/3]).
-import(mnesia).
-import(lists).
@@ -126,6 +126,7 @@
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> number()).
+-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
-endif.
@@ -492,3 +493,9 @@ ceil(N) ->
0 -> N;
_ -> 1 + T
end.
+
+queue_fold(Fun, Init, Q) ->
+ case queue:out(Q) of
+ {empty, _Q} -> Init;
+ {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
+ end.