summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-02-23 12:43:54 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-02-23 12:43:54 +0000
commit2725b8ce01c2636ac6fbfe375d1c936653363d8e (patch)
treef24a0cee1e615e8daf56dc049700de1975414df7 /src
parent56f4ccba310376979e4a59028ef710383d6dd04f (diff)
parent4230ebb9c24e607e240192894ab32408b8372634 (diff)
downloadrabbitmq-server-git-2725b8ce01c2636ac6fbfe375d1c936653363d8e.tar.gz
Merge bug24482
Diffstat (limited to 'src')
-rw-r--r--src/credit_flow.erl127
-rw-r--r--src/delegate.erl2
-rw-r--r--src/delegate_sup.erl2
-rw-r--r--src/file_handle_cache.erl13
-rw-r--r--src/gatherer.erl2
-rw-r--r--src/gen_server2.erl20
-rw-r--r--src/gm.erl44
-rw-r--r--src/gm_soak_test.erl2
-rw-r--r--src/gm_speed_test.erl2
-rw-r--r--src/gm_tests.erl2
-rw-r--r--src/lqueue.erl2
-rw-r--r--src/mirrored_supervisor.erl67
-rw-r--r--src/mirrored_supervisor_tests.erl29
-rw-r--r--src/mnesia_sync.erl77
-rw-r--r--src/pg_local.erl2
-rw-r--r--src/priority_queue.erl2
-rw-r--r--src/rabbit.erl78
-rw-r--r--src/rabbit_access_control.erl3
-rw-r--r--src/rabbit_alarm.erl5
-rw-r--r--src/rabbit_amqqueue.erl103
-rw-r--r--src/rabbit_amqqueue_process.erl118
-rw-r--r--src/rabbit_amqqueue_sup.erl2
-rw-r--r--src/rabbit_auth_backend.erl2
-rw-r--r--src/rabbit_auth_backend_internal.erl2
-rw-r--r--src/rabbit_auth_mechanism.erl2
-rw-r--r--src/rabbit_auth_mechanism_amqplain.erl2
-rw-r--r--src/rabbit_auth_mechanism_cr_demo.erl2
-rw-r--r--src/rabbit_auth_mechanism_plain.erl2
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_backing_queue_qc.erl2
-rw-r--r--src/rabbit_basic.erl10
-rw-r--r--src/rabbit_binary_generator.erl2
-rw-r--r--src/rabbit_binary_parser.erl2
-rw-r--r--src/rabbit_binding.erl24
-rw-r--r--src/rabbit_channel.erl284
-rw-r--r--src/rabbit_channel_sup.erl2
-rw-r--r--src/rabbit_channel_sup_sup.erl2
-rw-r--r--src/rabbit_client_sup.erl7
-rw-r--r--src/rabbit_command_assembler.erl2
-rw-r--r--src/rabbit_connection_sup.erl2
-rw-r--r--src/rabbit_control.erl42
-rw-r--r--src/rabbit_direct.erl2
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_error_logger_file_h.erl2
-rw-r--r--src/rabbit_event.erl2
-rw-r--r--src/rabbit_exchange.erl20
-rw-r--r--src/rabbit_exchange_type.erl2
-rw-r--r--src/rabbit_exchange_type_direct.erl2
-rw-r--r--src/rabbit_exchange_type_fanout.erl2
-rw-r--r--src/rabbit_exchange_type_headers.erl2
-rw-r--r--src/rabbit_exchange_type_invalid.erl47
-rw-r--r--src/rabbit_exchange_type_topic.erl125
-rw-r--r--src/rabbit_file.erl2
-rw-r--r--src/rabbit_framing.erl2
-rw-r--r--src/rabbit_guid.erl83
-rw-r--r--src/rabbit_heartbeat.erl2
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_log.erl112
-rw-r--r--src/rabbit_memory_monitor.erl42
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl13
-rw-r--r--src/rabbit_mirror_queue_master.erl8
-rw-r--r--src/rabbit_mirror_queue_misc.erl16
-rw-r--r--src/rabbit_mirror_queue_slave.erl134
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl2
-rw-r--r--src/rabbit_misc.erl81
-rw-r--r--src/rabbit_mnesia.erl85
-rw-r--r--src/rabbit_msg_file.erl2
-rw-r--r--src/rabbit_msg_store.erl67
-rw-r--r--src/rabbit_msg_store_ets_index.erl2
-rw-r--r--src/rabbit_msg_store_gc.erl2
-rw-r--r--src/rabbit_msg_store_index.erl2
-rw-r--r--src/rabbit_net.erl22
-rw-r--r--src/rabbit_networking.erl169
-rw-r--r--src/rabbit_node_monitor.erl25
-rw-r--r--src/rabbit_nodes.erl94
-rw-r--r--src/rabbit_plugins.erl36
-rw-r--r--src/rabbit_prelaunch.erl24
-rw-r--r--src/rabbit_queue_collector.erl2
-rw-r--r--src/rabbit_queue_index.erl4
-rw-r--r--src/rabbit_reader.erl248
-rw-r--r--src/rabbit_registry.erl2
-rw-r--r--src/rabbit_restartable_sup.erl5
-rw-r--r--src/rabbit_router.erl62
-rw-r--r--src/rabbit_sasl_report_file_h.erl2
-rw-r--r--src/rabbit_ssl.erl36
-rw-r--r--src/rabbit_sup.erl2
-rw-r--r--src/rabbit_tests.erl84
-rw-r--r--src/rabbit_tests_event_receiver.erl2
-rw-r--r--src/rabbit_trace.erl2
-rw-r--r--src/rabbit_types.erl16
-rw-r--r--src/rabbit_upgrade.erl2
-rw-r--r--src/rabbit_upgrade_functions.erl10
-rw-r--r--src/rabbit_variable_queue.erl19
-rw-r--r--src/rabbit_version.erl2
-rw-r--r--src/rabbit_vhost.erl2
-rw-r--r--src/rabbit_writer.erl7
-rw-r--r--src/supervisor2.erl6
-rw-r--r--src/tcp_acceptor.erl39
-rw-r--r--src/tcp_acceptor_sup.erl8
-rw-r--r--src/tcp_listener.erl16
-rw-r--r--src/tcp_listener_sup.erl14
-rw-r--r--src/test_sup.erl2
-rw-r--r--src/vm_memory_monitor.erl2
-rw-r--r--src/worker_pool.erl9
-rw-r--r--src/worker_pool_sup.erl2
-rw-r--r--src/worker_pool_worker.erl12
106 files changed, 1783 insertions, 1172 deletions
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
new file mode 100644
index 0000000000..072f4d9dc5
--- /dev/null
+++ b/src/credit_flow.erl
@@ -0,0 +1,127 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(credit_flow).
+
+%% Credit flow is controlled by a credit specification - a
+%% {InitialCredit, MoreCreditAfter} tuple. For the message sender,
+%% credit starts at InitialCredit and is decremented with every
+%% message sent. The message receiver grants more credit to the sender
+%% by sending it a {bump_credit, ...} control message after receiving
+%% MoreCreditAfter messages. The sender should pass this message in to
+%% handle_bump_msg/1. The sender should block when it goes below 0
+%% (check by invoking blocked/0). If a process is both a sender and a
+%% receiver it will not grant any more credit to its senders when it
+%% is itself blocked - thus the only processes that need to check
+%% blocked/0 are ones that read from network sockets.
+
+-define(DEFAULT_CREDIT, {200, 50}).
+
+-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]).
+-export([peer_down/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-opaque(bump_msg() :: {pid(), non_neg_integer()}).
+-type(credit_spec() :: {non_neg_integer(), non_neg_integer()}).
+
+-spec(send/1 :: (pid()) -> 'ok').
+-spec(send/2 :: (pid(), credit_spec()) -> 'ok').
+-spec(ack/1 :: (pid()) -> 'ok').
+-spec(ack/2 :: (pid(), credit_spec()) -> 'ok').
+-spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok').
+-spec(blocked/0 :: () -> boolean()).
+-spec(peer_down/1 :: (pid()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+%% There are two "flows" here; of messages and of credit, going in
+%% opposite directions. The variable names "From" and "To" refer to
+%% the flow of credit, but the function names refer to the flow of
+%% messages. This is the clearest I can make it (since the function
+%% names form the API and want to make sense externally, while the
+%% variable names are used in credit bookkeeping and want to make
+%% sense internally).
+
+%% For any given pair of processes, ack/2 and send/2 must always be
+%% called with the same credit_spec().
+
+send(From) -> send(From, ?DEFAULT_CREDIT).
+
+send(From, {InitialCredit, _MoreCreditAfter}) ->
+ update({credit_from, From}, InitialCredit,
+ fun (1) -> block(From),
+ 0;
+ (C) -> C - 1
+ end).
+
+ack(To) -> ack(To, ?DEFAULT_CREDIT).
+
+ack(To, {_InitialCredit, MoreCreditAfter}) ->
+ update({credit_to, To}, MoreCreditAfter,
+ fun (1) -> grant(To, MoreCreditAfter),
+ MoreCreditAfter;
+ (C) -> C - 1
+ end).
+
+handle_bump_msg({From, MoreCredit}) ->
+ update({credit_from, From}, 0,
+ fun (C) when C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
+ C + MoreCredit;
+ (C) -> C + MoreCredit
+ end).
+
+blocked() -> get(credit_blocked, []) =/= [].
+
+peer_down(Peer) ->
+ %% In theory we could also remove it from credit_deferred here, but it
+ %% doesn't really matter; at some point later we will drain
+ %% credit_deferred and thus send messages into the void...
+ unblock(Peer),
+ erase({credit_from, Peer}),
+ erase({credit_to, Peer}).
+
+%% --------------------------------------------------------------------------
+
+grant(To, Quantity) ->
+ Msg = {bump_credit, {self(), Quantity}},
+ case blocked() of
+ false -> To ! Msg;
+ true -> update(credit_deferred, [],
+ fun (Deferred) -> [{To, Msg} | Deferred] end)
+ end.
+
+block(From) -> update(credit_blocked, [], fun (Blocks) -> [From | Blocks] end).
+
+unblock(From) ->
+ update(credit_blocked, [], fun (Blocks) -> Blocks -- [From] end),
+ case blocked() of
+ false -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])],
+ erase(credit_deferred);
+ true -> ok
+ end.
+
+get(Key, Default) ->
+ case get(Key) of
+ undefined -> Default;
+ Value -> Value
+ end.
+
+update(Key, Default, Fun) -> put(Key, Fun(get(Key, Default))), ok.
diff --git a/src/delegate.erl b/src/delegate.erl
index edb4eba4ae..d595e4819e 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(delegate).
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index 4c131a6c59..2a8b915b89 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(delegate_sup).
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 6c3f1b5f50..59a0ab1cc0 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(file_handle_cache).
@@ -125,8 +125,7 @@
%% requesting process is considered to 'own' one more
%% descriptor. release/0 is the inverse operation and releases a
%% previously obtained descriptor. transfer/1 transfers ownership of a
-%% file descriptor between processes. It is non-blocking. Obtain is
-%% used to obtain permission to accept file descriptors. Obtain has a
+%% file descriptor between processes. It is non-blocking. Obtain has a
%% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use
%% the entire limit, but will be evicted by obtain calls up to the
%% point at which no more obtain calls can be satisfied by the obtains
@@ -262,7 +261,7 @@
-endif.
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [obtain_count, obtain_limit]).
+-define(INFO_KEYS, [total_limit, total_used, sockets_limit, sockets_used]).
%%----------------------------------------------------------------------------
%% Public API
@@ -790,8 +789,10 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-i(obtain_count, #fhc_state{obtain_count = Count}) -> Count;
-i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
+i(total_limit, #fhc_state{limit = Limit}) -> Limit;
+i(total_used, #fhc_state{open_count = C1, obtain_count = C2}) -> C1 + C2;
+i(sockets_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
+i(sockets_used, #fhc_state{obtain_count = Count}) -> Count;
i(Item, _) -> throw({bad_argument, Item}).
%%----------------------------------------------------------------------------
diff --git a/src/gatherer.erl b/src/gatherer.erl
index fe976b50a2..98b360389a 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gatherer).
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index ab6c4e64f3..f8537487cd 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -73,7 +73,7 @@
%% but where the second argument is specifically the priority_queue
%% which contains the prioritised message_queue.
-%% All modifications are (C) 2009-2011 VMware, Inc.
+%% All modifications are (C) 2009-2012 VMware, Inc.
%% ``The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -1079,7 +1079,7 @@ get_proc_name({local, Name}) ->
exit(process_not_registered)
end;
get_proc_name({global, Name}) ->
- case global:safe_whereis_name(Name) of
+ case whereis_name(Name) of
undefined ->
exit(process_not_registered_globally);
Pid when Pid =:= self() ->
@@ -1101,7 +1101,7 @@ get_parent() ->
name_to_pid(Name) ->
case whereis(Name) of
undefined ->
- case global:safe_whereis_name(Name) of
+ case whereis_name(Name) of
undefined ->
exit(could_not_find_registerd_name);
Pid ->
@@ -1111,6 +1111,20 @@ name_to_pid(Name) ->
Pid
end.
+whereis_name(Name) ->
+ case ets:lookup(global_names, Name) of
+ [{_Name, Pid, _Method, _RPid, _Ref}] ->
+ if node(Pid) == node() ->
+ case is_process_alive(Pid) of
+ true -> Pid;
+ false -> undefined
+ end;
+ true ->
+ Pid
+ end;
+ [] -> undefined
+ end.
+
find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
PrioriCall = function_exported_or_default(
Mod, 'prioritise_call', 3,
diff --git a/src/gm.erl b/src/gm.erl
index 8c838a7056..6f9ff5642b 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gm).
@@ -386,6 +386,7 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
-define(BROADCAST_TIMER, 25).
+-define(VERSION_START, 0).
-define(SETS, ordsets).
-define(DICT, orddict).
@@ -515,8 +516,8 @@ group_members(Server) ->
init([GroupName, Module, Args]) ->
{MegaSecs, Secs, MicroSecs} = now(),
random:seed(MegaSecs, Secs, MicroSecs),
+ Self = make_member(GroupName),
gen_server2:cast(self(), join),
- Self = self(),
{ok, #state { self = Self,
left = {Self, undefined},
right = {Self, undefined},
@@ -541,7 +542,8 @@ handle_call({confirmed_broadcast, Msg}, _From,
right = {Self, undefined},
module = Module,
callback_args = Args }) ->
- handle_callback_result({Module:handle_msg(Args, Self, Msg), ok, State});
+ handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
+ ok, State});
handle_call({confirmed_broadcast, Msg}, From, State) ->
internal_broadcast(Msg, From, State);
@@ -604,7 +606,8 @@ handle_cast({broadcast, Msg},
right = {Self, undefined},
module = Module,
callback_args = Args }) ->
- handle_callback_result({Module:handle_msg(Args, Self, Msg), State});
+ handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
+ State});
handle_cast({broadcast, Msg}, State) ->
internal_broadcast(Msg, none, State);
@@ -623,7 +626,7 @@ handle_cast(join, State = #state { self = Self,
State1 = check_neighbours(State #state { view = View,
members_state = MembersState }),
handle_callback_result(
- {Module:joined(Args, all_known_members(View)), State1});
+ {Module:joined(Args, get_pids(all_known_members(View))), State1});
handle_cast(leave, State) ->
{stop, normal, State}.
@@ -817,7 +820,7 @@ internal_broadcast(Msg, From, State = #state { self = Self,
confirms = Confirms,
callback_args = Args,
broadcast_buffer = Buffer }) ->
- Result = Module:handle_msg(Args, Self, Msg),
+ Result = Module:handle_msg(Args, get_pid(Self), Msg),
Buffer1 = [{PubCount, Msg} | Buffer],
Confirms1 = case From of
none -> Confirms;
@@ -979,7 +982,7 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
end,
try
case gen_server2:call(
- Left, {add_on_right, Self}, infinity) of
+ get_pid(Left), {add_on_right, Self}, infinity) of
{ok, Group1} -> group_to_view(Group1);
not_ready -> join_group(Self, GroupName)
end
@@ -1005,7 +1008,7 @@ prune_or_create_group(Self, GroupName) ->
mnesia:sync_transaction(
fun () -> GroupNew = #gm_group { name = GroupName,
members = [Self],
- version = 0 },
+ version = ?VERSION_START },
case mnesia:read({?GROUP_TABLE, GroupName}) of
[] ->
mnesia:write(GroupNew),
@@ -1114,24 +1117,25 @@ can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
ensure_neighbour(_Ver, Self, {Self, undefined}, Self) ->
{Self, undefined};
ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
- ok = gen_server2:cast(RealNeighbour, {?TAG, Ver, check_neighbours}),
+ ok = gen_server2:cast(get_pid(RealNeighbour),
+ {?TAG, Ver, check_neighbours}),
{RealNeighbour, maybe_monitor(RealNeighbour, Self)};
ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
{RealNeighbour, MRef};
ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
true = erlang:demonitor(MRef),
Msg = {?TAG, Ver, check_neighbours},
- ok = gen_server2:cast(RealNeighbour, Msg),
+ ok = gen_server2:cast(get_pid(RealNeighbour), Msg),
ok = case Neighbour of
Self -> ok;
- _ -> gen_server2:cast(Neighbour, Msg)
+ _ -> gen_server2:cast(get_pid(Neighbour), Msg)
end,
{Neighbour, maybe_monitor(Neighbour, Self)}.
maybe_monitor(Self, Self) ->
undefined;
maybe_monitor(Other, _Self) ->
- erlang:monitor(process, Other).
+ erlang:monitor(process, get_pid(Other)).
check_neighbours(State = #state { self = Self,
left = Left,
@@ -1238,6 +1242,15 @@ prepare_members_state(MembersState) ->
build_members_state(MembersStateList) ->
?DICT:from_list(MembersStateList).
+make_member(GroupName) ->
+ {case read_group(GroupName) of
+ #gm_group { version = Version } -> Version;
+ {error, not_found} -> ?VERSION_START
+ end, self()}.
+
+get_pid({_Version, Pid}) -> Pid.
+
+get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
%% ---------------------------------------------------------------------------
%% Activity assembly
@@ -1262,13 +1275,13 @@ maybe_send_activity(Activity, #state { self = Self,
send_right(Right, View, {activity, Self, Activity}).
send_right(Right, View, Msg) ->
- ok = gen_server2:cast(Right, {?TAG, view_version(View), Msg}).
+ ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}).
callback(Args, Module, Activity) ->
lists:foldl(
fun ({Id, Pubs, _Acks}, ok) ->
lists:foldl(fun ({_PubNum, Pub}, ok) ->
- Module:handle_msg(Args, Id, Pub);
+ Module:handle_msg(Args, get_pid(Id), Pub);
(_, Error) ->
Error
end, ok, Pubs);
@@ -1283,7 +1296,8 @@ callback_view_changed(Args, Module, OldView, NewView) ->
Deaths = OldMembers -- NewMembers,
case {Births, Deaths} of
{[], []} -> ok;
- _ -> Module:members_changed(Args, Births, Deaths)
+ _ -> Module:members_changed(Args, get_pids(Births),
+ get_pids(Deaths))
end.
handle_callback_result({Result, State}) ->
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index 5e5a3a5a6f..572175410d 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gm_soak_test).
diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl
index defb0f29b8..dad75bd447 100644
--- a/src/gm_speed_test.erl
+++ b/src/gm_speed_test.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gm_speed_test).
diff --git a/src/gm_tests.erl b/src/gm_tests.erl
index ca0ffd6483..0a2d420469 100644
--- a/src/gm_tests.erl
+++ b/src/gm_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gm_tests).
diff --git a/src/lqueue.erl b/src/lqueue.erl
index 04b4070621..c4e046b521 100644
--- a/src/lqueue.erl
+++ b/src/lqueue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(lqueue).
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 8dfe39f890..a599effa91 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(mirrored_supervisor).
@@ -144,32 +144,17 @@
-type child() :: pid() | 'undefined'.
-type child_id() :: term().
--type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | 'undefined'}.
-type modules() :: [module()] | 'dynamic'.
--type restart() :: 'permanent' | 'transient' | 'temporary'.
--type shutdown() :: 'brutal_kill' | timeout().
-type worker() :: 'worker' | 'supervisor'.
-type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}.
-type sup_ref() :: (Name :: atom())
| {Name :: atom(), Node :: node()}
| {'global', Name :: atom()}
| pid().
--type child_spec() :: {Id :: child_id(),
- StartFunc :: mfargs(),
- Restart :: restart(),
- Shutdown :: shutdown(),
- Type :: worker(),
- Modules :: modules()}.
-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term().
-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.
--type startchild_err() :: 'already_present'
- | {'already_started', Child :: child()} | term().
--type startchild_ret() :: {'ok', Child :: child()}
- | {'ok', Child :: child(), Info :: term()}
- | {'error', startchild_err()}.
-
-type group_name() :: any().
-spec start_link(GroupName, Module, Args) -> startlink_ret() when
@@ -183,9 +168,9 @@
Module :: module(),
Args :: term().
--spec start_child(SupRef, ChildSpec) -> startchild_ret() when
+-spec start_child(SupRef, ChildSpec) -> supervisor:startchild_ret() when
SupRef :: sup_ref(),
- ChildSpec :: child_spec() | (List :: [term()]).
+ ChildSpec :: supervisor:child_spec() | (List :: [term()]).
-spec restart_child(SupRef, Id) -> Result when
SupRef :: sup_ref(),
@@ -215,12 +200,12 @@
Modules :: modules().
-spec check_childspecs(ChildSpecs) -> Result when
- ChildSpecs :: [child_spec()],
+ ChildSpecs :: [supervisor:child_spec()],
Result :: 'ok' | {'error', Error :: term()}.
-spec start_internal(Group, ChildSpecs) -> Result when
Group :: group_name(),
- ChildSpecs :: [child_spec()],
+ ChildSpecs :: [supervisor:child_spec()],
Result :: startlink_ret().
-spec create_tables() -> Result when
@@ -242,8 +227,10 @@ start_link({global, _SupName}, _Group, _Mod, _Args) ->
start_link0(Prefix, Group, Init) ->
case apply(?SUPERVISOR, start_link,
Prefix ++ [?MODULE, {overall, Group, Init}]) of
- {ok, Pid} -> call(Pid, {init, Pid}),
- {ok, Pid};
+ {ok, Pid} -> case catch call(Pid, {init, Pid}) of
+ ok -> {ok, Pid};
+ E -> E
+ end;
Other -> Other
end.
@@ -346,13 +333,20 @@ handle_call({init, Overall}, _From,
end || Pid <- Rest],
Delegate = child(Overall, delegate),
erlang:monitor(process, Delegate),
- [maybe_start(Group, Delegate, S) || S <- ChildSpecs],
- {reply, ok, State#state{overall = Overall, delegate = Delegate}};
+ State1 = State#state{overall = Overall, delegate = Delegate},
+ case all_started([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of
+ true -> {reply, ok, State1};
+ false -> {stop, shutdown, State1}
+ end;
handle_call({start_child, ChildSpec}, _From,
State = #state{delegate = Delegate,
group = Group}) ->
- {reply, maybe_start(Group, Delegate, ChildSpec), State};
+ {reply, case maybe_start(Group, Delegate, ChildSpec) of
+ already_in_mnesia -> {error, already_present};
+ {already_in_mnesia, Pid} -> {error, {already_started, Pid}};
+ Else -> Else
+ end, State};
handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate,
group = Group}) ->
@@ -400,13 +394,16 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason},
%% TODO load balance this
%% No guarantee pg2 will have received the DOWN before us.
Self = self(),
- case lists:sort(?PG2:get_members(Group)) -- [Pid] of
- [Self | _] -> {atomic, ChildSpecs} =
- mnesia:transaction(fun() -> update_all(Pid) end),
- [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
- _ -> ok
- end,
- {noreply, State};
+ R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of
+ [Self | _] -> {atomic, ChildSpecs} =
+ mnesia:transaction(fun() -> update_all(Pid) end),
+ [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
+ _ -> []
+ end,
+ case all_started(R) of
+ true -> {noreply, State};
+ false -> {stop, shutdown, State}
+ end;
handle_info(Info, State) ->
{stop, {unexpected_info, Info}, State}.
@@ -428,8 +425,8 @@ maybe_start(Group, Delegate, ChildSpec) ->
check_start(Group, Delegate, ChildSpec)
end) of
{atomic, start} -> start(Delegate, ChildSpec);
- {atomic, undefined} -> {error, already_present};
- {atomic, Pid} -> {error, {already_started, Pid}};
+ {atomic, undefined} -> already_in_mnesia;
+ {atomic, Pid} -> {already_in_mnesia, Pid};
%% If we are torn down while in the transaction...
{aborted, E} -> {error, E}
end.
@@ -499,6 +496,8 @@ delete_all(Group) ->
[delete(Group, id(C)) ||
C <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])].
+all_started(Results) -> [] =:= [R || R = {error, _} <- Results].
+
%%----------------------------------------------------------------------------
create_tables() ->
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index b8d52ae86b..e8baabe8ff 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(mirrored_supervisor_tests).
@@ -36,15 +36,14 @@ all_tests() ->
passed = test_already_there(),
passed = test_delete_restart(),
passed = test_which_children(),
-%% commented out in order to determine whether this is the only test
-%% that is failing - see bug 24362
-%% passed = test_large_group(),
+ passed = test_large_group(),
passed = test_childspecs_at_init(),
passed = test_anonymous_supervisors(),
passed = test_no_migration_on_shutdown(),
passed = test_start_idempotence(),
passed = test_unsupported(),
passed = test_ignore(),
+ passed = test_startup_failure(),
passed.
%% Simplest test
@@ -197,6 +196,22 @@ test_ignore() ->
{sup, fake_strategy_for_ignore, []}),
passed.
+test_startup_failure() ->
+ [test_startup_failure(F) || F <- [want_error, want_exit]],
+ passed.
+
+test_startup_failure(Fail) ->
+ process_flag(trap_exit, true),
+ ?MS:start_link(get_group(group), ?MODULE,
+ {sup, one_for_one, [childspec(Fail)]}),
+ receive
+ {'EXIT', _, shutdown} ->
+ ok
+ after 1000 ->
+ exit({did_not_exit, Fail})
+ end,
+ process_flag(trap_exit, false).
+
%% ---------------------------------------------------------------------------
with_sups(Fun, Sups) ->
@@ -230,6 +245,12 @@ start_sup0(Name, Group, ChildSpecs) ->
childspec(Id) ->
{Id, {?MODULE, start_gs, [Id]}, transient, 16#ffffffff, worker, [?MODULE]}.
+start_gs(want_error) ->
+ {error, foo};
+
+start_gs(want_exit) ->
+ exit(foo);
+
start_gs(Id) ->
gen_server:start_link({local, Id}, ?MODULE, server, []).
diff --git a/src/mnesia_sync.erl b/src/mnesia_sync.erl
new file mode 100644
index 0000000000..a3773d90b2
--- /dev/null
+++ b/src/mnesia_sync.erl
@@ -0,0 +1,77 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(mnesia_sync).
+
+%% mnesia:sync_transaction/3 fails to guarantee that the log is flushed to disk
+%% at commit. This module is an attempt to minimise the risk of data loss by
+%% performing a coalesced log fsync. Unfortunately this is performed regardless
+%% of whether or not the log was appended to.
+
+-behaviour(gen_server).
+
+-export([sync/0]).
+
+-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {waiting, disc_node}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(sync/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+sync() ->
+ gen_server:call(?SERVER, sync, infinity).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, #state{disc_node = mnesia:system_info(use_dir), waiting = []}}.
+
+handle_call(sync, _From, #state{disc_node = false} = State) ->
+ {reply, ok, State};
+handle_call(sync, From, #state{waiting = Waiting} = State) ->
+ {noreply, State#state{waiting = [From | Waiting]}, 0};
+handle_call(Request, _From, State) ->
+ {stop, {unhandled_call, Request}, State}.
+
+handle_cast(Request, State) ->
+ {stop, {unhandled_cast, Request}, State}.
+
+handle_info(timeout, #state{waiting = Waiting} = State) ->
+ ok = disk_log:sync(latest_log),
+ [gen_server:reply(From, ok) || From <- Waiting],
+ {noreply, State#state{waiting = []}};
+handle_info(Message, State) ->
+ {stop, {unhandled_info, Message}, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/pg_local.erl b/src/pg_local.erl
index c9c3a3a715..e2e82f1f4b 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -13,7 +13,7 @@
%% versions of Erlang/OTP. The remaining type specs have been
%% removed.
-%% All modifications are (C) 2010-2011 VMware, Inc.
+%% All modifications are (C) 2010-2012 VMware, Inc.
%% %CopyrightBegin%
%%
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 4fc8b46972..780fa2e92f 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% Priority queues have essentially the same interface as ordinary
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 0a2681a219..0a0ca90a63 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit).
@@ -45,6 +45,12 @@
{requires, file_handle_cache},
{enables, external_infrastructure}]}).
+-rabbit_boot_step({database_sync,
+ [{description, "database sync"},
+ {mfa, {rabbit_sup, start_child, [mnesia_sync]}},
+ {requires, database},
+ {enables, external_infrastructure}]}).
+
-rabbit_boot_step({file_handle_cache,
[{description, "file handle cache server"},
{mfa, {rabbit_sup, start_restartable_child,
@@ -132,7 +138,7 @@
-rabbit_boot_step({recovery,
[{description, "exchange, queue and binding recovery"},
{mfa, {rabbit, recover, []}},
- {requires, empty_db_check},
+ {requires, core_initialized},
{enables, routing_ready}]}).
-rabbit_boot_step({mirror_queue_slave_sup,
@@ -158,8 +164,9 @@
{enables, networking}]}).
-rabbit_boot_step({direct_client,
- [{mfa, {rabbit_direct, boot, []}},
- {requires, log_relay}]}).
+ [{description, "direct client"},
+ {mfa, {rabbit_direct, boot, []}},
+ {requires, log_relay}]}).
-rabbit_boot_step({networking,
[{mfa, {rabbit_networking, boot, []}},
@@ -190,7 +197,7 @@
rabbit_queue_index, gen, dict, ordsets, file_handle_cache,
rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file,
rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
- mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists]).
+ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow]).
%% HiPE compilation uses multiple cores anyway, but some bits are
%% IO-bound so we can go faster if we parallelise a bit more. In
@@ -308,17 +315,28 @@ stop_and_halt() ->
ok.
status() ->
- [{pid, list_to_integer(os:getpid())},
- {running_applications, application:which_applications(infinity)},
- {os, os:type()},
- {erlang_version, erlang:system_info(system_version)},
- {memory, erlang:memory()}] ++
- rabbit_misc:filter_exit_map(
- fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end,
- [{vm_memory_high_watermark, {vm_memory_monitor,
- get_vm_memory_high_watermark, []}},
- {vm_memory_limit, {vm_memory_monitor,
- get_memory_limit, []}}]).
+ S1 = [{pid, list_to_integer(os:getpid())},
+ {running_applications, application:which_applications(infinity)},
+ {os, os:type()},
+ {erlang_version, erlang:system_info(system_version)},
+ {memory, erlang:memory()}],
+ S2 = rabbit_misc:filter_exit_map(
+ fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end,
+ [{vm_memory_high_watermark, {vm_memory_monitor,
+ get_vm_memory_high_watermark, []}},
+ {vm_memory_limit, {vm_memory_monitor,
+ get_memory_limit, []}}]),
+ S3 = rabbit_misc:with_exit_handler(
+ fun () -> [] end,
+ fun () -> [{file_descriptors, file_handle_cache:info()}] end),
+ S4 = [{processes, [{limit, erlang:system_info(process_limit)},
+ {used, erlang:system_info(process_count)}]},
+ {run_queue, erlang:statistics(run_queue)},
+ {uptime, begin
+ {T,_} = erlang:statistics(wall_clock),
+ T div 1000
+ end}],
+ S1 ++ S2 ++ S3 ++ S4.
is_running() -> is_running(node()).
@@ -348,10 +366,8 @@ rotate_logs(BinarySuffix) ->
start(normal, []) ->
case erts_version_check() of
ok ->
- ok = rabbit_mnesia:delete_previously_running_nodes(),
{ok, SupPid} = rabbit_sup:start_link(),
true = register(rabbit, self()),
-
print_banner(),
[ok = run_boot_step(Step) || Step <- boot_steps()],
io:format("~nbroker running~n"),
@@ -430,8 +446,7 @@ run_boot_step({StepName, Attributes}) ->
[try
apply(M,F,A)
catch
- _:Reason -> boot_error("FAILED~nReason: ~p~nStacktrace: ~p~n",
- [Reason, erlang:get_stacktrace()])
+ _:Reason -> boot_step_error(Reason, erlang:get_stacktrace())
end || {M,F,A} <- MFAs],
io:format("done~n"),
ok
@@ -490,8 +505,27 @@ sort_boot_steps(UnsortedSteps) ->
end])
end.
+boot_step_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
+ {Err, Nodes} =
+ case rabbit_mnesia:read_previously_running_nodes() of
+ [] -> {"Timeout contacting cluster nodes. Since RabbitMQ was"
+ " shut down forcefully~nit cannot determine which nodes"
+ " are timing out. Details on all nodes will~nfollow.~n",
+ rabbit_mnesia:all_clustered_nodes() -- [node()]};
+ Ns -> {rabbit_misc:format(
+ "Timeout contacting cluster nodes: ~p.~n", [Ns]),
+ Ns}
+ end,
+ boot_error(Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []);
+
+boot_step_error(Reason, Stacktrace) ->
+ boot_error("Error description:~n ~p~n~n"
+ "Log files (may contain more information):~n ~s~n ~s~n~n"
+ "Stack trace:~n ~p~n~n",
+ [Reason, log_location(kernel), log_location(sasl), Stacktrace]).
+
boot_error(Format, Args) ->
- io:format("BOOT ERROR: " ++ Format, Args),
+ io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
error_logger:error_msg(Format, Args),
timer:sleep(1000),
exit({?MODULE, failure_during_boot}).
@@ -645,7 +679,7 @@ print_banner() ->
{"app descriptor", app_location()},
{"home dir", home_dir()},
{"config file(s)", config_files()},
- {"cookie hash", rabbit_misc:cookie_hash()},
+ {"cookie hash", rabbit_nodes:cookie_hash()},
{"log", log_location(kernel)},
{"sasl log", log_location(sasl)},
{"database dir", rabbit_mnesia:dir()},
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index ca28d68637..75c5351182 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_access_control).
@@ -66,7 +66,6 @@ check_user_login(Username, AuthProps) ->
check_vhost_access(User = #user{ username = Username,
auth_backend = Module }, VHostPath) ->
- ?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]),
check_access(
fun() ->
rabbit_vhost:exists(VHostPath) andalso
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index fd03ca85b3..187ec1ab6c 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_alarm).
@@ -31,10 +31,9 @@
-ifdef(use_specs).
--type(mfa_tuple() :: {atom(), atom(), list()}).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(register/2 :: (pid(), mfa_tuple()) -> boolean()).
+-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()).
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 96017df812..a7dfd535c8 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue).
@@ -20,12 +20,12 @@
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, requeue/3, ack/3, reject/4]).
+ stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2, unblock/2, flush_all/2]).
+-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([store_queue/1]).
@@ -40,21 +40,23 @@
-define(INTEGER_ARG_TYPES, [byte, short, signedint, long]).
+-define(MORE_CONSUMER_CREDIT_AFTER, 50).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([name/0, qmsg/0]).
+-export_type([name/0, qmsg/0, routing_result/0]).
-type(name() :: rabbit_types:r('queue')).
-
+-type(qpids() :: [pid()]).
-type(qlen() :: rabbit_types:ok(non_neg_integer())).
-type(qfun(A) :: fun ((rabbit_types:amqqueue()) -> A | no_return())).
-type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}).
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-
+-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
-spec(start/0 :: () -> [name()]).
@@ -69,7 +71,8 @@
-> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
- rabbit_types:error('not_found')).
+ rabbit_types:error('not_found');
+ ([name()]) -> [rabbit_types:amqqueue()]).
-spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')).
-spec(with_or_die/2 ::
(name(), qfun(A)) -> A | rabbit_types:channel_exit()).
@@ -117,12 +120,15 @@
rabbit_types:error('in_use') |
rabbit_types:error('not_empty')).
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
--spec(deliver/2 :: (pid(), rabbit_types:delivery()) -> boolean()).
+-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
+ {routing_result(), qpids()}).
+-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
+ {routing_result(), qpids()}).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
--spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
--spec(limit_all/3 :: ([pid()], pid(), rabbit_limiter:token()) ->
+-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()).
+-spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) ->
ok_or_errors()).
-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
@@ -133,8 +139,9 @@
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
+-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
+-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit() |
@@ -264,6 +271,10 @@ add_default_binding(#amqqueue{name = QueueName}) ->
key = RoutingKey,
args = []}).
+lookup(Names) when is_list(Names) ->
+ %% Normally we'd call mnesia:dirty_read/1 here, but that is quite
+ %% expensive for reasons explained in rabbit_misc:dirty_read/1.
+ lists:append([ets:lookup(rabbit_queue, Name) || Name <- Names]);
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_queue, Name}).
@@ -419,14 +430,9 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge).
-deliver(QPid, Delivery = #delivery{immediate = true}) ->
- gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity);
-deliver(QPid, Delivery = #delivery{mandatory = true}) ->
- gen_server2:call(QPid, {deliver, Delivery}, infinity),
- true;
-deliver(QPid, Delivery) ->
- gen_server2:cast(QPid, {deliver, Delivery}),
- true.
+deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
+
+deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
requeue(QPid, MsgIds, ChPid) ->
delegate_call(QPid, {requeue, MsgIds, ChPid}).
@@ -458,7 +464,21 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
- gen_server2:cast(QPid, {notify_sent, ChPid}).
+ Key = {consumer_credit_to, QPid},
+ put(Key, case get(Key) of
+ 1 -> gen_server2:cast(
+ QPid, {notify_sent, ChPid,
+ ?MORE_CONSUMER_CREDIT_AFTER}),
+ ?MORE_CONSUMER_CREDIT_AFTER;
+ undefined -> erlang:monitor(process, QPid),
+ ?MORE_CONSUMER_CREDIT_AFTER - 1;
+ C -> C - 1
+ end),
+ ok.
+
+notify_sent_queue_down(QPid) ->
+ erase({consumer_credit_to, QPid}),
+ ok.
unblock(QPid, ChPid) ->
delegate_cast(QPid, {unblock, ChPid}).
@@ -518,6 +538,49 @@ pseudo_queue(QueueName, Pid) ->
slave_pids = [],
mirror_nodes = undefined}.
+deliver([], #delivery{mandatory = false, immediate = false}, _Flow) ->
+ %% /dev/null optimisation
+ {routed, []};
+
+deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) ->
+ %% optimisation: when Mandatory = false and Immediate = false,
+ %% rabbit_amqqueue:deliver will deliver the message to the queue
+ %% process asynchronously, and return true, which means all the
+ %% QPids will always be returned. It is therefore safe to use a
+ %% fire-and-forget cast here and return the QPids - the semantics
+ %% is preserved. This scales much better than the non-immediate
+ %% case below.
+ QPids = qpids(Qs),
+ case Flow of
+ flow -> [credit_flow:send(QPid) || QPid <- QPids];
+ noflow -> ok
+ end,
+ delegate:invoke_no_result(
+ QPids, fun (QPid) ->
+ gen_server2:cast(QPid, {deliver, Delivery, Flow})
+ end),
+ {routed, QPids};
+
+deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate},
+ _Flow) ->
+ QPids = qpids(Qs),
+ {Success, _} =
+ delegate:invoke(
+ QPids, fun (QPid) ->
+ gen_server2:call(QPid, {deliver, Delivery}, infinity)
+ end),
+ case {Mandatory, Immediate,
+ lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
+ ({_, false}, {_, H}) -> {true, H}
+ end, {false, []}, Success)} of
+ {true, _ , {false, []}} -> {unroutable, []};
+ {_ , true, {_ , []}} -> {not_delivered, []};
+ {_ , _ , {_ , R}} -> {routed, R}
+ end.
+
+qpids(Qs) -> lists:append([[QPid | SPids] ||
+ #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]).
+
safe_delegate_call_ok(F, Pids) ->
case delegate:invoke(Pids, fun (Pid) ->
rabbit_misc:with_exit_handler(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ba20b35524..12cd0c93ff 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue_process).
@@ -20,7 +20,7 @@
-behaviour(gen_server2).
--define(UNSENT_MESSAGE_LIMIT, 100).
+-define(UNSENT_MESSAGE_LIMIT, 200).
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
@@ -115,7 +115,6 @@ info_keys() -> ?INFO_KEYS.
%%----------------------------------------------------------------------------
init(Q) ->
- ?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
State = #q{q = Q#amqqueue{pid = self()},
@@ -135,7 +134,6 @@ init(Q) ->
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
RateTRef, AckTags, Deliveries, MTC) ->
- ?LOGDEBUG("Queue starting - ~p~n", [Q]),
case Owner of
none -> ok;
_ -> erlang:monitor(process, Owner)
@@ -390,34 +388,32 @@ ch_record_state_transition(OldCR, NewCR) ->
{_, _} -> ok
end.
-deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
+deliver_msgs_to_consumers(_DeliverFun, true, State) ->
+ {true, State};
+deliver_msgs_to_consumers(DeliverFun, false,
State = #q{active_consumers = ActiveConsumers}) ->
- case PredFun(FunAcc, State) of
- false -> {FunAcc, State};
- true -> case queue:out(ActiveConsumers) of
- {empty, _} ->
- {FunAcc, State};
- {{value, QEntry}, Tail} ->
- {FunAcc1, State1} =
- deliver_msg_to_consumer(
+ case queue:out(ActiveConsumers) of
+ {empty, _} ->
+ {false, State};
+ {{value, QEntry}, Tail} ->
+ {Stop, State1} = deliver_msg_to_consumer(
DeliverFun, QEntry,
- FunAcc, State#q{active_consumers = Tail}),
- deliver_msgs_to_consumers(Funs, FunAcc1, State1)
- end
+ State#q{active_consumers = Tail}),
+ deliver_msgs_to_consumers(DeliverFun, Stop, State1)
end.
-deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, FunAcc, State) ->
+deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
C = ch_record(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
- {FunAcc, State};
+ {false, State};
false -> case rabbit_limiter:can_send(C#cr.limiter, self(),
Consumer#consumer.ack_required) of
false -> block_consumer(C#cr{is_limit_active = true}, E),
- {FunAcc, State};
+ {false, State};
true -> AC1 = queue:in(E, State#q.active_consumers),
deliver_msg_to_consumer(
- DeliverFun, Consumer, C, FunAcc,
+ DeliverFun, Consumer, C,
State#q{active_consumers = AC1})
end
end.
@@ -428,9 +424,9 @@ deliver_msg_to_consumer(DeliverFun,
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
unsent_message_count = Count},
- FunAcc, State = #q{q = #amqqueue{name = QName}}) ->
- {{Message, IsDelivered, AckTag}, FunAcc1, State1} =
- DeliverFun(AckRequired, FunAcc, State),
+ State = #q{q = #amqqueue{name = QName}}) ->
+ {{Message, IsDelivered, AckTag}, Stop, State1} =
+ DeliverFun(AckRequired, State),
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
@@ -439,11 +435,9 @@ deliver_msg_to_consumer(DeliverFun,
end,
update_ch_record(C#cr{acktags = ChAckTags1,
unsent_message_count = Count + 1}),
- {FunAcc1, State1}.
-
-deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty.
+ {Stop, State1}.
-deliver_from_queue_deliver(AckRequired, false, State) ->
+deliver_from_queue_deliver(AckRequired, State) ->
{{Message, IsDelivered, AckTag, Remaining}, State1} =
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
@@ -487,12 +481,11 @@ maybe_record_confirm_message(_Confirm, State) ->
State.
run_message_queue(State) ->
- Funs = {fun deliver_from_queue_pred/2,
- fun deliver_from_queue_deliver/3},
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_messages(State),
- IsEmpty = BQ:is_empty(BQS),
- {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1),
+ {_IsEmpty1, State2} = deliver_msgs_to_consumers(
+ fun deliver_from_queue_deliver/2,
+ BQ:is_empty(BQS), State1),
State2.
attempt_delivery(Delivery = #delivery{sender = ChPid,
@@ -506,10 +499,8 @@ attempt_delivery(Delivery = #delivery{sender = ChPid,
end,
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
- PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
- fun (AckRequired, false,
- State1 = #q{backing_queue_state = BQS2}) ->
+ fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
%% we don't need an expiry here because
%% messages are not being enqueued, so we use
%% an empty message_properties.
@@ -523,7 +514,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid,
State1#q{backing_queue_state = BQS3}}
end,
{Delivered, State2} =
- deliver_msgs_to_consumers({ PredFun, DeliverFun }, false,
+ deliver_msgs_to_consumers(DeliverFun, false,
State#q{backing_queue_state = BQS1}),
{Delivered, Confirm, State2};
{Duplicate, BQS1} ->
@@ -598,6 +589,12 @@ should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
+ case get({ch_publisher, DownPid}) of
+ undefined -> ok;
+ MRef -> erlang:demonitor(MRef),
+ erase({ch_publisher, DownPid}),
+ credit_flow:peer_down(DownPid)
+ end,
case lookup_ch(DownPid) of
not_found ->
{ok, State};
@@ -713,15 +710,12 @@ infos(Items, State) ->
|| Item <- (Items1 -- [synchronised_slave_pids])].
slaves_status(#q{q = #amqqueue{name = Name}}) ->
- {ok, #amqqueue{mirror_nodes = MNodes, slave_pids = SPids}} =
- rabbit_amqqueue:lookup(Name),
- case MNodes of
- undefined ->
+ case rabbit_amqqueue:lookup(Name) of
+ {ok, #amqqueue{mirror_nodes = undefined}} ->
[{slave_pids, ''}, {synchronised_slave_pids, ''}];
- _ ->
+ {ok, #amqqueue{slave_pids = SPids}} ->
{Results, _Bad} =
- delegate:invoke(
- SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end),
+ delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1),
{SPids1, SSPids} =
lists:foldl(
fun ({Pid, Infos}, {SPidsN, SSPidsN}) ->
@@ -765,11 +759,9 @@ i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
- {ok, #amqqueue{mirror_nodes = MNodes,
- slave_pids = SPids}} = rabbit_amqqueue:lookup(Name),
- case MNodes of
- undefined -> [];
- _ -> SPids
+ case rabbit_amqqueue:lookup(Name) of
+ {ok, #amqqueue{mirror_nodes = undefined}} -> [];
+ {ok, #amqqueue{slave_pids = SPids}} -> SPids
end;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
@@ -826,7 +818,7 @@ prioritise_cast(Msg, _State) ->
{set_maximum_since_use, _Age} -> 8;
{ack, _AckTags, _ChPid} -> 7;
{reject, _AckTags, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
+ {notify_sent, _ChPid, _Credit} -> 7;
{unblock, _ChPid} -> 7;
{run_backing_queue, _Mod, _Fun} -> 6;
_ -> 0
@@ -877,9 +869,7 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State) ->
reply(consumers(State), State);
-handle_call({deliver_immediately, Delivery}, _From, State) ->
- %% Synchronous, "immediate" delivery mode
- %%
+handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) ->
%% FIXME: Is this correct semantics?
%%
%% I'm worried in particular about the case where an exchange has
@@ -897,8 +887,7 @@ handle_call({deliver_immediately, Delivery}, _From, State) ->
false -> discard_delivery(Delivery, State1)
end);
-handle_call({deliver, Delivery}, From, State) ->
- %% Synchronous, "mandatory" delivery mode. Reply asap.
+handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) ->
gen_server2:reply(From, true),
noreply(deliver_or_enqueue(Delivery, State));
@@ -1021,8 +1010,17 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ case Flow of
+ flow -> Key = {ch_publisher, Sender},
+ case get(Key) of
+ undefined -> put(Key, erlang:monitor(process, Sender));
+ _ -> ok
+ end,
+ credit_flow:ack(Sender);
+ noflow -> ok
+ end,
noreply(deliver_or_enqueue(Delivery, State));
handle_cast({ack, AckTags, ChPid}, State) ->
@@ -1054,11 +1052,11 @@ handle_cast({unblock, ChPid}, State) ->
possibly_unblock(State, ChPid,
fun (C) -> C#cr{is_limit_active = false} end));
-handle_cast({notify_sent, ChPid}, State) ->
+handle_cast({notify_sent, ChPid, Credit}, State) ->
noreply(
possibly_unblock(State, ChPid,
fun (C = #cr{unsent_message_count = Count}) ->
- C#cr{unsent_message_count = Count - 1}
+ C#cr{unsent_message_count = Count - Credit}
end));
handle_cast({limit, ChPid, Limiter}, State) ->
@@ -1102,8 +1100,7 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
handle_info(maybe_expire, State) ->
case is_unused(State) of
- true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
- {stop, normal, State};
+ true -> {stop, normal, State};
false -> noreply(ensure_expiry_timer(State))
end;
@@ -1150,8 +1147,11 @@ handle_info(timeout, State) ->
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
+handle_info({bump_credit, Msg}, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ noreply(State);
+
handle_info(Info, State) ->
- ?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 7b3ebcf266..a4305e5f5b 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue_sup).
diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl
index ade158bb8c..e0e252b898 100644
--- a/src/rabbit_auth_backend.erl
+++ b/src/rabbit_auth_backend.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_backend).
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 086a90b49f..3ef81d3208 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_backend_internal).
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
index 897199ee78..0c8251b803 100644
--- a/src/rabbit_auth_mechanism.erl
+++ b/src/rabbit_auth_mechanism.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism).
diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl
index b8682a465e..3de6e7a679 100644
--- a/src/rabbit_auth_mechanism_amqplain.erl
+++ b/src/rabbit_auth_mechanism_amqplain.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism_amqplain).
diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl
index acbb6e48f6..64b01d8e5d 100644
--- a/src/rabbit_auth_mechanism_cr_demo.erl
+++ b/src/rabbit_auth_mechanism_cr_demo.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism_cr_demo).
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
index 2448acb660..19fb587567 100644
--- a/src/rabbit_auth_mechanism_plain.erl
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism_plain).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index c3b322ee6d..364eb8f646 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_backing_queue).
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index c61184a601..7b00fa5f13 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_backing_queue_qc).
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index b266d3664d..b8211d4332 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_basic).
@@ -29,7 +29,7 @@
-type(properties_input() ::
(rabbit_framing:amqp_property_record() | [{atom(), any()}])).
-type(publish_result() ::
- ({ok, rabbit_router:routing_result(), [pid()]}
+ ({ok, rabbit_amqqueue:routing_result(), [pid()]}
| rabbit_types:error('not_found'))).
-type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())).
@@ -88,8 +88,8 @@ publish(Delivery = #delivery{
end.
publish(X, Delivery) ->
- {RoutingRes, DeliveredQPids} =
- rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery),
+ Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)),
+ {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery),
{ok, RoutingRes, DeliveredQPids}.
delivery(Mandatory, Immediate, Message, MsgSeqNo) ->
@@ -139,7 +139,7 @@ message(XName, RoutingKey, #content{properties = Props} = DecodedContent) ->
{ok, #basic_message{
exchange_name = XName,
content = strip_header(DecodedContent, ?DELETED_HEADER),
- id = rabbit_guid:guid(),
+ id = rabbit_guid:gen(),
is_persistent = is_message_persistent(DecodedContent),
routing_keys = [RoutingKey |
header_routes(Props#'P_basic'.headers)]}}
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 494f3203c8..d69376fb75 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_binary_generator).
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index f3ca4e9897..5f0016b60b 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_binary_parser).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index e625a427e4..bb44797e4d 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_binding).
@@ -277,6 +277,7 @@ has_for_source(SrcName) ->
contains(rabbit_semi_durable_route, Match).
remove_for_source(SrcName) ->
+ lock_route_tables(),
Match = #route{binding = #binding{source = SrcName, _ = '_'}},
Routes = lists:usort(
mnesia:match_object(rabbit_route, Match, write) ++
@@ -351,7 +352,28 @@ continue('$end_of_table') -> false;
continue({[_|_], _}) -> true;
continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
+%% For bulk operations we lock the tables we are operating on in order
+%% to reduce the time complexity. Without the table locks we end up
+%% with num_tables*num_bulk_bindings row-level locks. Taking each lock
+%% takes time proportional to the number of existing locks, thus
+%% resulting in O(num_bulk_bindings^2) complexity.
+%%
+%% The locks need to be write locks since ultimately we end up
+%% removing all these rows.
+%%
+%% The downside of all this is that no other binding operations except
+%% lookup/routing (which uses dirty ops) can take place
+%% concurrently. However, that is the case already since the bulk
+%% operations involve mnesia:match_object calls with a partial key,
+%% which entails taking a table lock.
+lock_route_tables() ->
+ [mnesia:lock({table, T}, write) || T <- [rabbit_route,
+ rabbit_reverse_route,
+ rabbit_semi_durable_route,
+ rabbit_durable_route]].
+
remove_for_destination(DstName, DeleteFun) ->
+ lock_route_tables(),
Match = reverse_route(
#route{binding = #binding{destination = DstName, _ = '_'}}),
ReverseRoutes = mnesia:match_object(rabbit_reverse_route, Match, write),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 9b2fe28ce8..a101886f26 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel).
@@ -20,7 +20,7 @@
-behaviour(gen_server2).
--export([start_link/10, do/2, do/3, flush/1, shutdown/1]).
+-export([start_link/10, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
@@ -33,9 +33,9 @@
-export([list_local/0]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- limiter, tx_status, next_tag,
- unacked_message_q, uncommitted_message_q, uncommitted_acks,
- user, virtual_host, most_recently_declared_queue, queue_monitors,
+ limiter, tx_status, next_tag, unacked_message_q,
+ uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
+ virtual_host, most_recently_declared_queue, queue_monitors,
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
unconfirmed_qm, confirmed, capabilities, trace_state}).
@@ -78,6 +78,8 @@
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
+-spec(do_flow/3 :: (pid(), rabbit_framing:amqp_method_record(),
+ rabbit_types:maybe(rabbit_types:content())) -> 'ok').
-spec(flush/1 :: (pid()) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
@@ -111,7 +113,11 @@ do(Pid, Method) ->
do(Pid, Method, none).
do(Pid, Method, Content) ->
- gen_server2:cast(Pid, {method, Method, Content}).
+ gen_server2:cast(Pid, {method, Method, Content, noflow}).
+
+do_flow(Pid, Method, Content) ->
+ credit_flow:send(Pid),
+ gen_server2:cast(Pid, {method, Method, Content, flow}).
flush(Pid) ->
gen_server2:call(Pid, flush, infinity).
@@ -185,10 +191,11 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
unacked_message_q = queue:new(),
uncommitted_message_q = queue:new(),
uncommitted_acks = [],
+ uncommitted_nacks = [],
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- queue_monitors = dict:new(),
+ queue_monitors = sets:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
queue_consumers = dict:new(),
@@ -244,7 +251,12 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
handle_call(_Request, _From, State) ->
noreply(State).
-handle_cast({method, Method, Content}, State) ->
+handle_cast({method, Method, Content, Flow},
+ State = #ch{reader_pid = Reader}) ->
+ case Flow of
+ flow -> credit_flow:ack(Reader);
+ noflow -> ok
+ end,
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
@@ -284,29 +296,19 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
handle_cast({deliver, ConsumerTag, AckRequired,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_keys = [RoutingKey | _CcRoutes],
- content = Content}}},
- State = #ch{writer_pid = WriterPid,
- next_tag = DeliveryTag,
- trace_state = TraceState}) ->
- State1 = lock_message(AckRequired,
- ack_record(DeliveryTag, ConsumerTag, Msg),
- State),
-
- M = #'basic.deliver'{consumer_tag = ConsumerTag,
- delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey},
- rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
- State2 = maybe_incr_stats([{QPid, 1}], case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State1),
- State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
- rabbit_trace:tap_trace_out(Msg, TraceState),
- noreply(State3#ch{next_tag = DeliveryTag + 1});
-
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}},
+ State = #ch{writer_pid = WriterPid,
+ next_tag = DeliveryTag}) ->
+ ok = rabbit_writer:send_command_and_notify(
+ WriterPid, QPid, self(),
+ #'basic.deliver'{consumer_tag = ConsumerTag,
+ delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey},
+ Content),
+ noreply(record_sent(ConsumerTag, AckRequired, Msg, State));
handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
@@ -315,6 +317,10 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
+handle_info({bump_credit, Msg}, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ noreply(State);
+
handle_info(timeout, State) ->
noreply(State);
@@ -327,9 +333,10 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
State2 = queue_blocked(QPid, State1),
State3 = handle_consuming_queue_down(QPid, State2),
+ credit_flow:peer_down(QPid),
erase_queue_stats(QPid),
noreply(State3#ch{queue_monitors =
- dict:erase(QPid, State3#ch.queue_monitors)});
+ sets:del_element(QPid, State3#ch.queue_monitors)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -527,7 +534,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
#'channel.flow_ok'{active = false});
_ -> ok
end,
- demonitor_queue(QPid, State#ch{blocking = Blocking1})
+ State#ch{blocking = Blocking1}
end.
record_confirm(undefined, _, State) ->
@@ -565,8 +572,7 @@ remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs},
MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
case gb_sets:is_empty(MsgSeqNos1) of
true -> UQM1 = gb_trees:delete(QPid, UQM),
- demonitor_queue(
- QPid, State#ch{unconfirmed_qm = UQM1});
+ State#ch{unconfirmed_qm = UQM1};
false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
State#ch{unconfirmed_qm = UQM1}
end;
@@ -672,45 +678,36 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
State1 = State#ch{unacked_message_q = Remaining},
{noreply,
case TxStatus of
- none -> ack(Acked, State1);
+ none -> ack(Acked, State1),
+ State1;
in_progress -> State1#ch{uncommitted_acks =
Acked ++ State1#ch.uncommitted_acks}
end};
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
- _, State = #ch{writer_pid = WriterPid,
- conn_pid = ConnPid,
- next_tag = DeliveryTag,
- trace_state = TraceState}) ->
+ _, State = #ch{writer_pid = WriterPid,
+ conn_pid = ConnPid,
+ next_tag = DeliveryTag}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, QPid, _MsgId, Redelivered,
+ Msg = {_QName, _QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_keys = [RoutingKey | _CcRoutes],
- content = Content}}} ->
- State1 = lock_message(not(NoAck),
- ack_record(DeliveryTag, none, Msg),
- State),
- State2 = maybe_incr_stats([{QPid, 1}], case NoAck of
- true -> get_no_ack;
- false -> get
- end, State1),
- State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
- rabbit_trace:tap_trace_out(Msg, TraceState),
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}} ->
ok = rabbit_writer:send_command(
WriterPid,
- #'basic.get_ok'{delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey,
+ #'basic.get_ok'{delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, State3#ch{next_tag = DeliveryTag + 1}};
+ {noreply, record_sent(none, not(NoAck), Msg, State)};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -730,7 +727,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
check_read_permitted(QueueName, State),
ActualConsumerTag =
case ConsumerTag of
- <<>> -> rabbit_guid:binstring_guid("amq.ctag");
+ <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
+ "amq.ctag");
Other -> Other
end,
@@ -787,9 +785,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
false -> dict:store(QPid, CTags1, QCons)
end
end,
- NewState = demonitor_queue(
- Q, State#ch{consumer_mapping = ConsumerMapping1,
- queue_consumers = QCons1}),
+ NewState = State#ch{consumer_mapping = ConsumerMapping1,
+ queue_consumers = QCons1},
%% In order to ensure that no more messages are sent to
%% the consumer after the cancel_ok has been sent, we get
%% the queue process to send the cancel_ok on our
@@ -960,7 +957,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
false -> none
end,
ActualNameBin = case QueueNameBin of
- <<>> -> rabbit_guid:binstring_guid("amq.gen");
+ <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
+ "amq.gen");
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
@@ -1068,19 +1066,26 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
-handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
- uncommitted_acks = TAL}) ->
- State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2,
- State, TMQ))),
- {noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
+handle_method(#'tx.commit'{}, _,
+ State = #ch{uncommitted_message_q = TMQ,
+ uncommitted_acks = TAL,
+ uncommitted_nacks = TNL,
+ limiter = Limiter}) ->
+ State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
+ ack(TAL, State1),
+ lists:foreach(
+ fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL),
+ {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_acks = TAL}) ->
- UAMQ1 = queue:from_list(lists:usort(TAL ++ queue:to_list(UAMQ))),
+ uncommitted_acks = TAL,
+ uncommitted_nacks = TNL}) ->
+ TNL1 = lists:append([L || {_, L} <- TNL]),
+ UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))),
{reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})};
handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
@@ -1111,10 +1116,7 @@ handle_method(#'channel.flow'{active = false}, _,
ok = rabbit_limiter:block(Limiter1),
case consumer_queues(Consumers) of
[] -> {reply, #'channel.flow_ok'{active = false}, State1};
- QPids -> State2 = lists:foldl(fun monitor_queue/2,
- State1#ch{blocking =
- sets:from_list(QPids)},
- QPids),
+ QPids -> State2 = State1#ch{blocking = sets:from_list(QPids)},
ok = rabbit_amqqueue:flush_all(QPids, self()),
{noreply, State2}
end;
@@ -1145,31 +1147,12 @@ consumer_monitor(ConsumerTag,
end.
monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case (not dict:is_key(QPid, QMons) andalso
- queue_monitor_needed(QPid, State)) of
- true -> MRef = erlang:monitor(process, QPid),
- State#ch{queue_monitors = dict:store(QPid, MRef, QMons)};
- false -> State
- end.
-
-demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case (dict:is_key(QPid, QMons) andalso
- not queue_monitor_needed(QPid, State)) of
- true -> true = erlang:demonitor(dict:fetch(QPid, QMons)),
- State#ch{queue_monitors = dict:erase(QPid, QMons)};
+ case not sets:is_element(QPid, QMons) of
+ true -> erlang:monitor(process, QPid),
+ State#ch{queue_monitors = sets:add_element(QPid, QMons)};
false -> State
end.
-queue_monitor_needed(QPid, #ch{queue_consumers = QCons,
- blocking = Blocking,
- unconfirmed_qm = UQM} = State) ->
- StatsEnabled = rabbit_event:stats_level(
- State, #ch.stats_timer) =:= fine,
- ConsumerMonitored = dict:is_key(QPid, QCons),
- QueueBlocked = sets:is_element(QPid, Blocking),
- ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
- StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored.
-
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
@@ -1266,18 +1249,46 @@ basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey},
Content).
-reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) ->
+reject(DeliveryTag, Requeue, Multiple,
+ State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
+ State1 = State#ch{unacked_message_q = Remaining},
+ {noreply,
+ case TxStatus of
+ none ->
+ reject(Requeue, Acked, State1#ch.limiter),
+ State1;
+ in_progress ->
+ State1#ch{uncommitted_nacks =
+ [{Requeue, Acked} | State1#ch.uncommitted_nacks]}
+ end}.
+
+reject(Requeue, Acked, Limiter) ->
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
end, ok, Acked),
- ok = notify_limiter(State#ch.limiter, Acked),
- {noreply, State#ch{unacked_message_q = Remaining}}.
-
-ack_record(DeliveryTag, ConsumerTag,
- _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) ->
- {DeliveryTag, ConsumerTag, {QPid, MsgId}}.
+ ok = notify_limiter(Limiter, Acked).
+
+record_sent(ConsumerTag, AckRequired,
+ Msg = {_QName, QPid, MsgId, Redelivered, _Message},
+ State = #ch{unacked_message_q = UAMQ,
+ next_tag = DeliveryTag,
+ trace_state = TraceState}) ->
+ maybe_incr_stats([{QPid, 1}], case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State),
+ rabbit_trace:tap_trace_out(Msg, TraceState),
+ UAMQ1 = case AckRequired of
+ true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
+ UAMQ);
+ false -> UAMQ
+ end,
+ State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
collect_acks(Q, 0, true) ->
{queue:to_list(Q), queue:new()};
@@ -1312,7 +1323,8 @@ ack(Acked, State) ->
maybe_incr_stats(QIncs, ack, State).
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
- uncommitted_acks = []}.
+ uncommitted_acks = [],
+ uncommitted_nacks = []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1362,22 +1374,25 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
msg_seq_no = MsgSeqNo},
QNames}, State) ->
- {RoutingRes, DeliveredQPids} = rabbit_router:deliver(QNames, Delivery),
- State1 = process_routing_result(RoutingRes, DeliveredQPids,
- XName, MsgSeqNo, Message, State),
+ {RoutingRes, DeliveredQPids} =
+ rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
+ State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids),
+ State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message, State1),
maybe_incr_stats([{XName, 1} |
[{{QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State1).
+ QPid <- DeliveredQPids]], publish, State2),
+ State2.
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
- record_confirm(MsgSeqNo, XName,
- maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
- return_unroutable, State));
+ maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
+ return_unroutable, State),
+ record_confirm(MsgSeqNo, XName, State);
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_consumers),
- record_confirm(MsgSeqNo, XName,
- maybe_incr_stats([{XName, 1}], return_not_delivered, State));
+ maybe_incr_stats([{XName, 1}], return_not_delivered, State),
+ record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
@@ -1395,15 +1410,10 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
State0#ch{unconfirmed_qm = UQM1};
none ->
UQM1 = gb_trees:insert(QPid, SingletonSet, UQM),
- monitor_queue(QPid, State0#ch{unconfirmed_qm = UQM1})
+ State0#ch{unconfirmed_qm = UQM1}
end
end, State#ch{unconfirmed_mq = UMQ1}, QPids).
-lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
- State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
-lock_message(false, _MsgStruct, State) ->
- State.
-
send_nacks([], State) ->
State;
send_nacks(MXs, State = #ch{tx_status = none}) ->
@@ -1419,13 +1429,12 @@ send_nacks(_, State) ->
send_confirms(State = #ch{tx_status = none, confirmed = []}) ->
State;
send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
- {MsgSeqNos, State1} =
- lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) ->
- {[MsgSeqNo | MSNs],
- maybe_incr_stats([{ExchangeName, 1}], confirm,
- State0)}
- end, {[], State}, lists:append(C)),
- send_confirms(MsgSeqNos, State1 #ch{confirmed = []});
+ MsgSeqNos =
+ lists:foldl(fun ({MsgSeqNo, XName}, MSNs) ->
+ maybe_incr_stats([{XName, 1}], confirm, State),
+ [MsgSeqNo | MSNs]
+ end, [], lists:append(C)),
+ send_confirms(MsgSeqNos, State#ch{confirmed = []});
send_confirms(State) ->
maybe_complete_tx(State).
@@ -1505,26 +1514,21 @@ i(Item, _) ->
maybe_incr_redeliver_stats(true, QPid, State) ->
maybe_incr_stats([{QPid, 1}], redeliver, State);
-maybe_incr_redeliver_stats(_, _, State) ->
- State.
+maybe_incr_redeliver_stats(_, _, _State) ->
+ ok.
maybe_incr_stats(QXIncs, Measure, State) ->
case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine -> lists:foldl(fun ({QX, Inc}, State0) ->
- incr_stats(QX, Inc, Measure, State0)
- end, State, QXIncs);
- _ -> State
+ fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
+ _ -> ok
end.
-incr_stats({QPid, _} = QX, Inc, Measure, State) ->
- update_measures(queue_exchange_stats, QX, Inc, Measure),
- monitor_queue(QPid, State);
-incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) ->
- update_measures(queue_stats, QPid, Inc, Measure),
- monitor_queue(QPid, State);
-incr_stats(X, Inc, Measure, State) ->
- update_measures(exchange_stats, X, Inc, Measure),
- State.
+incr_stats({_, _} = QX, Inc, Measure) ->
+ update_measures(queue_exchange_stats, QX, Inc, Measure);
+incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
+ update_measures(queue_stats, QPid, Inc, Measure);
+incr_stats(X, Inc, Measure) ->
+ update_measures(exchange_stats, X, Inc, Measure).
update_measures(Type, QX, Inc, Measure) ->
Measures = case get({Type, QX}) of
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index a19b6bfdad..dc262b4948 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel_sup).
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index e2561c802e..995c41fbc2 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel_sup_sup).
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index dfb400e37f..c508f1b996 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_client_sup).
@@ -28,8 +28,9 @@
-ifdef(use_specs).
--spec(start_link/1 :: (mfa()) -> rabbit_types:ok_pid_or_error()).
--spec(start_link/2 :: ({'local', atom()}, mfa()) ->
+-spec(start_link/1 :: (rabbit_types:mfargs()) ->
+ rabbit_types:ok_pid_or_error()).
+-spec(start_link/2 :: ({'local', atom()}, rabbit_types:mfargs()) ->
rabbit_types:ok_pid_or_error()).
-endif.
diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl
index a0953eab95..adf6e41702 100644
--- a/src/rabbit_command_assembler.erl
+++ b/src/rabbit_command_assembler.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_command_assembler).
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index b2aba2eeb0..12a532b6fc 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_connection_sup).
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 20486af52b..6a775adfb0 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -11,13 +11,13 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_control).
-include("rabbit.hrl").
--export([start/0, stop/0, action/5, diagnostics/1]).
+-export([start/0, stop/0, action/5]).
-define(RPC_TIMEOUT, infinity).
-define(EXTERNAL_CHECK_INTERVAL, 1000).
@@ -49,7 +49,6 @@
(atom(), node(), [string()], [{string(), any()}],
fun ((string(), [any()]) -> 'ok'))
-> 'ok').
--spec(diagnostics/1 :: (node()) -> [{string(), [any()]}]).
-spec(usage/0 :: () -> no_return()).
-endif.
@@ -67,7 +66,7 @@ start() ->
CmdArgsAndOpts -> CmdArgsAndOpts
end,
Opts1 = [case K of
- ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)};
+ ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
_ -> {K, V}
end || {K, V} <- Opts],
Command = list_to_atom(Command0),
@@ -79,6 +78,12 @@ start() ->
io:format(Format ++ " ...~n", Args1)
end
end,
+ PrintInvalidCommandError =
+ fun () ->
+ print_error("invalid command '~s'",
+ [string:join([atom_to_list(Command) | Args], " ")])
+ end,
+
%% The reason we don't use a try/catch here is that rpc:call turns
%% thrown errors into normal return values
case catch action(Command, Node, Args, Opts, Inform) of
@@ -88,9 +93,11 @@ start() ->
false -> io:format("...done.~n")
end,
rabbit_misc:quit(0);
- {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
- print_error("invalid command '~s'",
- [string:join([atom_to_list(Command) | Args], " ")]),
+ {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> %% < R15
+ PrintInvalidCommandError(),
+ usage();
+ {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> %% >= R15
+ PrintInvalidCommandError(),
usage();
{'EXIT', {badarg, _}} ->
print_error("invalid parameter: ~p", [Args]),
@@ -135,26 +142,7 @@ print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg) ->
print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
print_badrpc_diagnostics(Node) ->
- [fmt_stderr(Fmt, Args) || {Fmt, Args} <- diagnostics(Node)].
-
-diagnostics(Node) ->
- {_NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
- [{"diagnostics:", []},
- case net_adm:names(NodeHost) of
- {error, EpmdReason} ->
- {"- unable to connect to epmd on ~s: ~w",
- [NodeHost, EpmdReason]};
- {ok, NamePorts} ->
- {"- nodes and their ports on ~s: ~p",
- [NodeHost, [{list_to_atom(Name), Port} ||
- {Name, Port} <- NamePorts]]}
- end,
- {"- current node: ~w", [node()]},
- case init:get_argument(home) of
- {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]};
- Other -> {"- no current node home dir: ~p", [Other]}
- end,
- {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]}].
+ fmt_stderr(rabbit_nodes:diagnostics([Node]), []).
stop() ->
ok.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 6f9a46504f..e2928cae7e 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_direct).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 6e29ace71a..f1672f4e7b 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_error_logger).
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 7b6e07c19f..042ab23cbc 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_error_logger_file_h).
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 5ae40c784a..4ec141cfd8 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_event).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a15b9be490..83e28c44a8 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange).
@@ -355,11 +355,21 @@ peek_serial(XName) ->
_ -> undefined
end.
+invalid_module(T) ->
+ rabbit_log:warning(
+ "Could not find exchange type ~s.~n", [T]),
+ put({xtype_to_module, T}, rabbit_exchange_type_invalid),
+ rabbit_exchange_type_invalid.
+
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
case get({xtype_to_module, T}) of
- undefined -> {ok, Module} = rabbit_registry:lookup_module(exchange, T),
- put({xtype_to_module, T}, Module),
- Module;
- Module -> Module
+ undefined ->
+ case rabbit_registry:lookup_module(exchange, T) of
+ {ok, Module} -> put({xtype_to_module, T}, Module),
+ Module;
+ {error, not_found} -> invalid_module(T)
+ end;
+ Module ->
+ Module
end.
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index ab3d00dc28..44a08e2437 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type).
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index b485e31f33..4bce42d4da 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_direct).
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 3c02972278..cc3fb87c21 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_fanout).
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index f09e4aae73..de9979b422 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_headers).
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
new file mode 100644
index 0000000000..8f60f7d898
--- /dev/null
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -0,0 +1,47 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_exchange_type_invalid).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_exchange_type).
+
+-export([description/0, serialise_events/0, route/2]).
+-export([validate/1, create/2, delete/3,
+ add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
+-include("rabbit_exchange_type_spec.hrl").
+
+description() ->
+ [{name, <<"invalid">>},
+ {description,
+ <<"Dummy exchange type, to be used when the intended one is not found.">>
+ }].
+
+serialise_events() -> false.
+
+route(#exchange{name = Name, type = Type}, _) ->
+ rabbit_misc:protocol_error(
+ precondition_failed,
+ "Cannot route message through ~s: exchange type ~s not found",
+ [rabbit_misc:rs(Name), Type]).
+
+validate(_X) -> ok.
+create(_Tx, _X) -> ok.
+delete(_Tx, _X, _Bs) -> ok.
+add_binding(_Tx, _X, _B) -> ok.
+remove_bindings(_Tx, _X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 348655b101..84f4f8a9de 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_topic).
@@ -52,6 +52,7 @@ validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(transaction, #exchange{name = X}, _Bs) ->
+ trie_remove_all_nodes(X),
trie_remove_all_edges(X),
trie_remove_all_bindings(X),
ok;
@@ -63,59 +64,26 @@ add_binding(transaction, _Exchange, Binding) ->
add_binding(none, _Exchange, _Binding) ->
ok.
-remove_bindings(transaction, #exchange{name = X}, Bs) ->
- %% The remove process is split into two distinct phases. In the
- %% first phase we gather the lists of bindings and edges to
- %% delete, then in the second phase we process all the
- %% deletions. This is to prevent interleaving of read/write
- %% operations in mnesia that can adversely affect performance.
- {ToDelete, Paths} =
- lists:foldl(
- fun(#binding{source = S, key = K, destination = D}, {Acc, PathAcc}) ->
- Path = [{FinalNode, _} | _] =
- follow_down_get_path(S, split_topic_key(K)),
- {[{FinalNode, D} | Acc],
- decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))}
- end, {[], gb_trees:empty()}, Bs),
-
- [trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete],
- [trie_remove_edge(X, Parent, Node, W) ||
- {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)],
+remove_bindings(transaction, _X, Bs) ->
+ %% See rabbit_binding:lock_route_tables for the rationale for
+ %% taking table locks.
+ case Bs of
+ [_] -> ok;
+ _ -> [mnesia:lock({table, T}, write) ||
+ T <- [rabbit_topic_trie_node,
+ rabbit_topic_trie_edge,
+ rabbit_topic_trie_binding]]
+ end,
+ [begin
+ Path = [{FinalNode, _} | _] =
+ follow_down_get_path(X, split_topic_key(K)),
+ trie_remove_binding(X, FinalNode, D),
+ remove_path_if_empty(X, Path)
+ end || #binding{source = X, key = K, destination = D} <- Bs],
ok;
remove_bindings(none, _X, _Bs) ->
ok.
-maybe_add_path(_X, [{root, none}], PathAcc) ->
- PathAcc;
-maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) ->
- case gb_trees:is_defined(Node, PathAcc) of
- true -> PathAcc;
- false -> gb_trees:insert(Node, {Parent, W, {trie_binding_count(X, Node),
- trie_child_count(X, Node)}},
- PathAcc)
- end.
-
-decrement_bindings(X, Path, PathAcc) ->
- with_path_acc(X, fun({Bindings, Edges}) -> {Bindings - 1, Edges} end,
- Path, PathAcc).
-
-decrement_edges(X, Path, PathAcc) ->
- with_path_acc(X, fun({Bindings, Edges}) -> {Bindings, Edges - 1} end,
- Path, PathAcc).
-
-with_path_acc(_X, _Fun, [{root, none}], PathAcc) ->
- PathAcc;
-with_path_acc(X, Fun, [{Node, _} | ParentPath], PathAcc) ->
- {Parent, W, Counts} = gb_trees:get(Node, PathAcc),
- NewCounts = Fun(Counts),
- NewPathAcc = gb_trees:update(Node, {Parent, W, NewCounts}, PathAcc),
- case NewCounts of
- {0, 0} -> decrement_edges(X, ParentPath,
- maybe_add_path(X, ParentPath, NewPathAcc));
- _ -> NewPathAcc
- end.
-
-
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
@@ -183,6 +151,16 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) ->
error -> {error, Acc, Words}
end.
+remove_path_if_empty(_, [{root, none}]) ->
+ ok;
+remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) ->
+ case mnesia:read(rabbit_topic_trie_node,
+ #trie_node{exchange_name = X, node_id = Node}, write) of
+ [] -> trie_remove_edge(X, Parent, Node, W),
+ remove_path_if_empty(X, RestPath);
+ _ -> ok
+ end.
+
trie_child(X, Node, Word) ->
case mnesia:read({rabbit_topic_trie_edge,
#trie_edge{exchange_name = X,
@@ -199,10 +177,30 @@ trie_bindings(X, Node) ->
destination = '$1'}},
mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
+trie_update_node_counts(X, Node, Field, Delta) ->
+ E = case mnesia:read(rabbit_topic_trie_node,
+ #trie_node{exchange_name = X,
+ node_id = Node}, write) of
+ [] -> #topic_trie_node{trie_node = #trie_node{
+ exchange_name = X,
+ node_id = Node},
+ edge_count = 0,
+ binding_count = 0};
+ [E0] -> E0
+ end,
+ case setelement(Field, E, element(Field, E) + Delta) of
+ #topic_trie_node{edge_count = 0, binding_count = 0} ->
+ ok = mnesia:delete_object(rabbit_topic_trie_node, E, write);
+ EN ->
+ ok = mnesia:write(rabbit_topic_trie_node, EN, write)
+ end.
+
trie_add_edge(X, FromNode, ToNode, W) ->
+ trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, +1),
trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3).
trie_remove_edge(X, FromNode, ToNode, W) ->
+ trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, -1),
trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3).
trie_edge_op(X, FromNode, ToNode, W, Op) ->
@@ -214,9 +212,11 @@ trie_edge_op(X, FromNode, ToNode, W, Op) ->
write).
trie_add_binding(X, Node, D) ->
+ trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1),
trie_binding_op(X, Node, D, fun mnesia:write/3).
trie_remove_binding(X, Node, D) ->
+ trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1),
trie_binding_op(X, Node, D, fun mnesia:delete_object/3).
trie_binding_op(X, Node, D, Op) ->
@@ -227,23 +227,11 @@ trie_binding_op(X, Node, D, Op) ->
destination = D}},
write).
-trie_child_count(X, Node) ->
- count(rabbit_topic_trie_edge,
- #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
- node_id = Node,
- _ = '_'},
- _ = '_'}).
-
-trie_binding_count(X, Node) ->
- count(rabbit_topic_trie_binding,
- #topic_trie_binding{
- trie_binding = #trie_binding{exchange_name = X,
- node_id = Node,
- _ = '_'},
- _ = '_'}).
-
-count(Table, Match) ->
- length(mnesia:match_object(Table, Match, read)).
+trie_remove_all_nodes(X) ->
+ remove_all(rabbit_topic_trie_node,
+ #topic_trie_node{trie_node = #trie_node{exchange_name = X,
+ _ = '_'},
+ _ = '_'}).
trie_remove_all_edges(X) ->
remove_all(rabbit_topic_trie_edge,
@@ -262,7 +250,7 @@ remove_all(Table, Pattern) ->
mnesia:match_object(Table, Pattern, write)).
new_node_id() ->
- rabbit_guid:guid().
+ rabbit_guid:gen().
split_topic_key(Key) ->
split_topic_key(Key, [], []).
@@ -275,4 +263,3 @@ split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) ->
split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]);
split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) ->
split_topic_key(Rest, [C | RevWordAcc], RevResAcc).
-
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 5cb8e7b69d..59df14f318 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_file).
diff --git a/src/rabbit_framing.erl b/src/rabbit_framing.erl
index da1a6a49ec..a79188abaf 100644
--- a/src/rabbit_framing.erl
+++ b/src/rabbit_framing.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% TODO auto-generate
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 2d0f5014f2..f4c425ca20 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_guid).
@@ -19,7 +19,7 @@
-behaviour(gen_server).
-export([start_link/0]).
--export([guid/0, string_guid/1, binstring_guid/1]).
+-export([gen/0, gen_secure/0, string/2, binary/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
@@ -38,9 +38,10 @@
-type(guid() :: binary()).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(guid/0 :: () -> guid()).
--spec(string_guid/1 :: (any()) -> string()).
--spec(binstring_guid/1 :: (any()) -> binary()).
+-spec(gen/0 :: () -> guid()).
+-spec(gen_secure/0 :: () -> guid()).
+-spec(string/2 :: (guid(), any()) -> string()).
+-spec(binary/2 :: (guid(), any()) -> binary()).
-endif.
@@ -65,11 +66,8 @@ update_disk_serial() ->
end,
Serial.
-%% generate a GUID.
-%%
-%% The id is only unique within a single cluster and as long as the
-%% serial store hasn't been deleted.
-guid() ->
+%% Generate an un-hashed guid.
+fresh() ->
%% We don't use erlang:now() here because a) it may return
%% duplicates when the system clock has been rewound prior to a
%% restart, or ids were generated at a high rate (which causes
@@ -78,29 +76,74 @@ guid() ->
%%
%% A persisted serial number, the node, and a unique reference
%% (per node incarnation) uniquely identifies a process in space
- %% and time. We combine that with a process-local counter to give
- %% us a GUID.
- G = case get(guid) of
- undefined -> Serial = gen_server:call(?SERVER, serial, infinity),
- {{Serial, node(), make_ref()}, 0};
+ %% and time.
+ Serial = gen_server:call(?SERVER, serial, infinity),
+ {Serial, node(), make_ref()}.
+
+advance_blocks({B1, B2, B3, B4}, I) ->
+ %% To produce a new set of blocks, we create a new 32bit block
+ %% hashing {B5, I}. The new hash is used as last block, and the
+ %% other three blocks are XORed with it.
+ %%
+ %% Doing this is convenient because it avoids cascading conflits,
+ %% while being very fast. The conflicts are avoided by propagating
+ %% the changes through all the blocks at each round by XORing, so
+ %% the only occasion in which a collision will take place is when
+ %% all 4 blocks are the same and the counter is the same.
+ %%
+ %% The range (2^32) is provided explicitly since phash uses 2^27
+ %% by default.
+ B5 = erlang:phash2({B1, I}, 4294967296),
+ {{(B2 bxor B5), (B3 bxor B5), (B4 bxor B5), B5}, I+1}.
+
+blocks_to_binary({B1, B2, B3, B4}) -> <<B1:32, B2:32, B3:32, B4:32>>.
+
+%% generate a GUID. This function should be used when performance is a
+%% priority and predictability is not an issue. Otherwise use
+%% gen_secure/0.
+gen() ->
+ %% We hash a fresh GUID with md5, split it in 4 blocks, and each
+ %% time we need a new guid we rotate them producing a new hash
+ %% with the aid of the counter. Look at the comments in
+ %% advance_blocks/2 for details.
+ {BS, I} = case get(guid) of
+ undefined -> <<B1:32, B2:32, B3:32, B4:32>> =
+ erlang:md5(term_to_binary(fresh())),
+ {{B1,B2,B3,B4}, 0};
+ {BS0, I0} -> advance_blocks(BS0, I0)
+ end,
+ put(guid, {BS, I}),
+ blocks_to_binary(BS).
+
+%% generate a non-predictable GUID.
+%%
+%% The id is only unique within a single cluster and as long as the
+%% serial store hasn't been deleted.
+%%
+%% If you are not concerned with predictability, gen/0 is faster.
+gen_secure() ->
+ %% Here instead of hashing once we hash the GUID and the counter
+ %% each time, so that the GUID is not predictable.
+ G = case get(guid_secure) of
+ undefined -> {fresh(), 0};
{S, I} -> {S, I+1}
end,
- put(guid, G),
+ put(guid_secure, G),
erlang:md5(term_to_binary(G)).
%% generate a readable string representation of a GUID.
%%
%% employs base64url encoding, which is safer in more contexts than
%% plain base64.
-string_guid(Prefix) ->
+string(G, Prefix) ->
Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
($\/, Acc) -> [$\_ | Acc];
($\=, Acc) -> Acc;
(Chr, Acc) -> [Chr | Acc]
- end, [], base64:encode_to_string(guid())).
+ end, [], base64:encode_to_string(G)).
-binstring_guid(Prefix) ->
- list_to_binary(string_guid(Prefix)).
+binary(G, Prefix) ->
+ list_to_binary(string(G, Prefix)).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 177ae86859..80b4e768a3 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_heartbeat).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 8a08d4b673..9fa6213b94 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_limiter).
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 558e095751..a6b4eeb05f 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_log).
@@ -23,8 +23,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([debug/1, debug/2, message/4, info/1, info/2,
- warning/1, warning/2, error/1, error/2]).
+-export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]).
-define(SERVER, ?MODULE).
@@ -32,9 +31,15 @@
-ifdef(use_specs).
+-export_type([level/0]).
+
+-type(category() :: atom()).
+-type(level() :: 'info' | 'warning' | 'error').
+
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(debug/1 :: (string()) -> 'ok').
--spec(debug/2 :: (string(), [any()]) -> 'ok').
+
+-spec(log/3 :: (category(), level(), string()) -> 'ok').
+-spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok').
-spec(info/1 :: (string()) -> 'ok').
-spec(info/2 :: (string(), [any()]) -> 'ok').
-spec(warning/1 :: (string()) -> 'ok').
@@ -42,84 +47,47 @@
-spec(error/1 :: (string()) -> 'ok').
-spec(error/2 :: (string(), [any()]) -> 'ok').
--spec(message/4 :: (_,_,_,_) -> 'ok').
-
-endif.
%%----------------------------------------------------------------------------
-
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+log(Category, Level, Fmt) -> log(Category, Level, Fmt, []).
-debug(Fmt) ->
- gen_server:cast(?SERVER, {debug, Fmt}).
-
-debug(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {debug, Fmt, Args}).
-
-message(Direction, Channel, MethodRecord, Content) ->
- gen_server:cast(?SERVER,
- {message, Direction, Channel, MethodRecord, Content}).
+log(Category, Level, Fmt, Args) when is_list(Args) ->
+ gen_server:cast(?SERVER, {log, Category, Level, Fmt, Args}).
-info(Fmt) ->
- gen_server:cast(?SERVER, {info, Fmt}).
-
-info(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {info, Fmt, Args}).
-
-warning(Fmt) ->
- gen_server:cast(?SERVER, {warning, Fmt}).
-
-warning(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {warning, Fmt, Args}).
-
-error(Fmt) ->
- gen_server:cast(?SERVER, {error, Fmt}).
-
-error(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {error, Fmt, Args}).
+info(Fmt) -> log(default, info, Fmt).
+info(Fmt, Args) -> log(default, info, Fmt, Args).
+warning(Fmt) -> log(default, warning, Fmt).
+warning(Fmt, Args) -> log(default, warning, Fmt, Args).
+error(Fmt) -> log(default, error, Fmt).
+error(Fmt, Args) -> log(default, error, Fmt, Args).
%%--------------------------------------------------------------------
-init([]) -> {ok, none}.
+init([]) ->
+ {ok, CatLevelList} = application:get_env(log_levels),
+ CatLevels = [{Cat, level(Level)} || {Cat, Level} <- CatLevelList],
+ {ok, orddict:from_list(CatLevels)}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({debug, Fmt}, State) ->
- io:format("debug:: "), io:format(Fmt),
- error_logger:info_msg("debug:: " ++ Fmt),
- {noreply, State};
-handle_cast({debug, Fmt, Args}, State) ->
- io:format("debug:: "), io:format(Fmt, Args),
- error_logger:info_msg("debug:: " ++ Fmt, Args),
- {noreply, State};
-handle_cast({message, Direction, Channel, MethodRecord, Content}, State) ->
- io:format("~s ch~p ~p~n",
- [case Direction of
- in -> "-->";
- out -> "<--" end,
- Channel,
- {MethodRecord, Content}]),
- {noreply, State};
-handle_cast({info, Fmt}, State) ->
- error_logger:info_msg(Fmt),
- {noreply, State};
-handle_cast({info, Fmt, Args}, State) ->
- error_logger:info_msg(Fmt, Args),
- {noreply, State};
-handle_cast({warning, Fmt}, State) ->
- error_logger:warning_msg(Fmt),
- {noreply, State};
-handle_cast({warning, Fmt, Args}, State) ->
- error_logger:warning_msg(Fmt, Args),
- {noreply, State};
-handle_cast({error, Fmt}, State) ->
- error_logger:error_msg(Fmt),
- {noreply, State};
-handle_cast({error, Fmt, Args}, State) ->
- error_logger:error_msg(Fmt, Args),
- {noreply, State};
+handle_cast({log, Category, Level, Fmt, Args}, CatLevels) ->
+ CatLevel = case orddict:find(Category, CatLevels) of
+ {ok, L} -> L;
+ error -> level(info)
+ end,
+ case level(Level) =< CatLevel of
+ false -> ok;
+ true -> (case Level of
+ info -> fun error_logger:info_msg/2;
+ warning -> fun error_logger:warning_msg/2;
+ error -> fun error_logger:error_msg/2
+ end)(Fmt, Args)
+ end,
+ {noreply, CatLevels};
handle_cast(_Msg, State) ->
{noreply, State}.
@@ -132,3 +100,9 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+%%--------------------------------------------------------------------
+
+level(info) -> 3;
+level(warning) -> 2;
+level(error) -> 1;
+level(none) -> 0.
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 02f3158f3b..f22ad874e3 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
@@ -178,11 +178,8 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%----------------------------------------------------------------------------
-zero_clamp(Sum) ->
- case Sum < ?EPSILON of
- true -> 0.0;
- false -> Sum
- end.
+zero_clamp(Sum) when Sum < ?EPSILON -> 0.0;
+zero_clamp(Sum) -> Sum.
internal_deregister(Pid, Demonitor,
State = #state { queue_duration_sum = Sum,
@@ -240,26 +237,21 @@ internal_update(State = #state { queue_durations = Durations,
fun (Proc = #process { reported = QueueDuration,
sent = PrevSendDuration,
callback = {M, F, A} }, true) ->
- case (case {QueueDuration, PrevSendDuration} of
- {infinity, infinity} ->
- true;
- {infinity, D} ->
- DesiredDurationAvg1 < D;
- {D, infinity} ->
- DesiredDurationAvg1 < D;
- {D1, D2} ->
- DesiredDurationAvg1 <
- lists:min([D1,D2])
- end) of
- true ->
- ok = erlang:apply(
- M, F, A ++ [DesiredDurationAvg1]),
- ets:insert(
- Durations,
- Proc #process {sent = DesiredDurationAvg1});
- false ->
- true
+ case should_send(QueueDuration, PrevSendDuration,
+ DesiredDurationAvg1) of
+ true -> ok = erlang:apply(
+ M, F, A ++ [DesiredDurationAvg1]),
+ ets:insert(
+ Durations,
+ Proc #process {
+ sent = DesiredDurationAvg1});
+ false -> true
end
end, true, Durations)
end,
State1.
+
+should_send(infinity, infinity, _) -> true;
+should_send(infinity, D, DD) -> DD < D;
+should_send(D, infinity, DD) -> DD < D;
+should_send(D1, D2, DD) -> DD < lists:min([D1, D2]).
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 8ed2bede32..d0b5bab779 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_coordinator).
@@ -325,8 +325,7 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
true = link(GM),
GM
end,
- {ok, _TRef} =
- timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]),
+ ensure_gm_heartbeat(),
{ok, #state { q = Q,
gm = GM1,
monitors = dict:new(),
@@ -366,6 +365,11 @@ handle_cast({ensure_monitoring, Pids},
end, Monitors, Pids),
noreply(State #state { monitors = Monitors1 }).
+handle_info(send_gm_heartbeat, State = #state{gm = GM}) ->
+ gm:broadcast(GM, heartbeat),
+ ensure_gm_heartbeat(),
+ noreply(State);
+
handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
State = #state { monitors = Monitors,
death_fun = DeathFun }) ->
@@ -419,3 +423,6 @@ noreply(State) ->
reply(Reply, State) ->
{reply, Reply, State, hibernate}.
+
+ensure_gm_heartbeat() ->
+ erlang:send_after(?ONE_SECOND, self(), send_gm_heartbeat).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index f60562efe6..64a4a7371e 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_master).
@@ -280,8 +280,10 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.
-status(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
- BQ:status(BQS).
+status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:status(BQS) ++
+ [ {mirror_seen, dict:size(State #state.seen_status)},
+ {mirror_senders, sets:size(State #state.known_senders)} ].
invoke(?MODULE, Fun, State) ->
Fun(?MODULE, State);
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index baebc52b27..db7d8eccec 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_misc).
@@ -136,12 +136,16 @@ add_mirror(Queue, MirrorNode) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
[] -> Result = rabbit_mirror_queue_slave_sup:start_child(
MirrorNode, [Q]),
- rabbit_log:info(
- "Adding mirror of queue ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, Result]),
case Result of
- {ok, _Pid} -> ok;
- _ -> Result
+ {ok, undefined} -> %% Already running
+ ok;
+ {ok, _Pid} ->
+ rabbit_log:info(
+ "Adding mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, Result]),
+ ok;
+ _ ->
+ Result
end;
[_] -> {error, {queue_already_mirrored_on_node, MirrorNode}}
end
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 73eaed1476..9bf89bce3f 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_slave).
@@ -90,7 +90,7 @@
}).
start_link(Q) ->
- gen_server2:start_link(?MODULE, [Q], []).
+ gen_server2:start_link(?MODULE, Q, []).
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
@@ -98,59 +98,64 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) ->
gen_server2:call(QPid, info, infinity).
-init([#amqqueue { name = QueueName } = Q]) ->
- process_flag(trap_exit, true), %% amqqueue_process traps exits too.
- {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
- receive {joined, GM} ->
- ok
- end,
+init(#amqqueue { name = QueueName } = Q) ->
Self = self(),
Node = node(),
- {ok, MPid} =
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
- mnesia:read({rabbit_queue, QueueName}),
- %% ASSERTION
- [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node],
- MPids1 = MPids ++ [Self],
- ok = rabbit_amqqueue:store_queue(
- Q1 #amqqueue { slave_pids = MPids1 }),
- {ok, QPid}
- end),
- erlang:monitor(process, MPid),
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use, [Self]),
- ok = rabbit_memory_monitor:register(
- Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
- {ok, BQ} = application:get_env(backing_queue_module),
- BQS = bq_init(BQ, Q, false),
- State = #state { q = Q,
- gm = GM,
- master_pid = MPid,
- backing_queue = BQ,
- backing_queue_state = BQS,
- rate_timer_ref = undefined,
- sync_timer_ref = undefined,
-
- sender_queues = dict:new(),
- msg_id_ack = dict:new(),
- ack_num = 0,
-
- msg_id_status = dict:new(),
- known_senders = dict:new(),
-
- synchronised = false
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
+ mnesia:read({rabbit_queue, QueueName}),
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
+ [] -> MPids1 = MPids ++ [Self],
+ ok = rabbit_amqqueue:store_queue(
+ Q1 #amqqueue { slave_pids = MPids1 }),
+ {new, QPid};
+ [SPid] -> true = rabbit_misc:is_process_alive(SPid),
+ existing
+ end
+ end) of
+ {new, MPid} ->
+ process_flag(trap_exit, true), %% amqqueue_process traps exits too.
+ {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ receive {joined, GM} ->
+ ok
+ end,
+ erlang:monitor(process, MPid),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [Self]),
+ ok = rabbit_memory_monitor:register(
+ Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQS = bq_init(BQ, Q, false),
+ State = #state { q = Q,
+ gm = GM,
+ master_pid = MPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = undefined,
+ sync_timer_ref = undefined,
+
+ sender_queues = dict:new(),
+ msg_id_ack = dict:new(),
+ ack_num = 0,
+
+ msg_id_status = dict:new(),
+ known_senders = dict:new(),
+
+ synchronised = false
},
- rabbit_event:notify(queue_slave_created,
- infos(?CREATION_EVENT_KEYS, State)),
- ok = gm:broadcast(GM, request_length),
- {ok, State, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-
-handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
- %% Synchronous, "immediate" delivery mode
+ rabbit_event:notify(queue_slave_created,
+ infos(?CREATION_EVENT_KEYS, State)),
+ ok = gm:broadcast(GM, request_length),
+ {ok, State, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
+ ?DESIRED_HIBERNATE}};
+ existing ->
+ ignore
+ end.
+handle_call({deliver, Delivery = #delivery { immediate = true }},
+ From, State) ->
%% It is safe to reply 'false' here even if a) we've not seen the
%% msg via gm, or b) the master dies before we receive the msg via
%% gm. In the case of (a), we will eventually receive the msg via
@@ -166,8 +171,8 @@ handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
gen_server2:reply(From, false), %% master may deliver it, not us
noreply(maybe_enqueue_message(Delivery, false, State));
-handle_call({deliver, Delivery = #delivery {}}, From, State) ->
- %% Synchronous, "mandatory" delivery mode
+handle_call({deliver, Delivery = #delivery { mandatory = true }},
+ From, State) ->
gen_server2:reply(From, true), %% amqqueue throws away the result anyway
noreply(maybe_enqueue_message(Delivery, true, State));
@@ -208,8 +213,12 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
-handle_cast({deliver, Delivery = #delivery {}}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ case Flow of
+ flow -> credit_flow:ack(Sender);
+ noflow -> ok
+ end,
noreply(maybe_enqueue_message(Delivery, true, State));
handle_cast({set_maximum_since_use, Age}, State) ->
@@ -250,6 +259,10 @@ handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
+handle_info({bump_credit, Msg}, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ noreply(State);
+
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -447,7 +460,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% Everything that we're monitoring, we need to ensure our new
%% coordinator is monitoring.
- MonitoringPids = [begin true = erlang:demonitor(MRef),
+ MonitoringPids = [begin put({ch_publisher, Pid}, MRef),
Pid
end || {Pid, MRef} <- dict:to_list(KS)],
ok = rabbit_mirror_queue_coordinator:ensure_monitoring(
@@ -526,9 +539,11 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
CPid, BQ, BQS, GM, SS, MonitoringPids),
- MTC = dict:from_list(
- [{MsgId, {ChPid, MsgSeqNo}} ||
- {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]),
+ MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) ->
+ gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
+ (_, MTC0) ->
+ MTC0
+ end, gb_trees:empty(), MSList),
NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
@@ -599,7 +614,8 @@ ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
local_sender_death(ChPid, State = #state { known_senders = KS }) ->
ok = case dict:is_key(ChPid, KS) of
false -> ok;
- true -> confirm_sender_death(ChPid)
+ true -> credit_flow:peer_down(ChPid),
+ confirm_sender_death(ChPid)
end,
State.
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
index fc04ec79ca..8eacb1f3c4 100644
--- a/src/rabbit_mirror_queue_slave_sup.erl
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_slave_sup).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 0578cf7d9e..b6d38172b5 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_misc).
@@ -34,11 +34,11 @@
-export([execute_mnesia_transaction/2]).
-export([execute_mnesia_tx_with_tail/1]).
-export([ensure_ok/2]).
--export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
+-export([tcp_name/3]).
-export([upmap/2, map_in_order/2]).
-export([table_filter/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
--export([format_stderr/2, with_local_io/1, local_info_msg/2]).
+-export([format/2, format_stderr/2, with_local_io/1, local_info_msg/2]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
@@ -141,9 +141,6 @@
-spec(execute_mnesia_tx_with_tail/1 ::
(thunk(fun ((boolean()) -> B))) -> B | (fun ((boolean()) -> B))).
-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
--spec(makenode/1 :: ({string(), string()} | string()) -> node()).
--spec(nodeparts/1 :: (node() | string()) -> {string(), string()}).
--spec(cookie_hash/0 :: () -> string()).
-spec(tcp_name/3 ::
(atom(), inet:ip_address(), rabbit_networking:ip_port())
-> atom()).
@@ -155,6 +152,7 @@
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom())
-> 'ok' | 'aborted').
-spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()).
+-spec(format/2 :: (string(), [any()]) -> string()).
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(with_local_io/1 :: (fun (() -> A)) -> A).
-spec(local_info_msg/2 :: (string(), [any()]) -> 'ok').
@@ -222,7 +220,7 @@ frame_error(MethodName, BinaryFields) ->
protocol_error(frame_error, "cannot decode ~w", [BinaryFields], MethodName).
amqp_error(Name, ExplanationFormat, Params, Method) ->
- Explanation = lists:flatten(io_lib:format(ExplanationFormat, Params)),
+ Explanation = format(ExplanationFormat, Params),
#amqp_error{name = Name, explanation = Explanation, method = Method}.
protocol_error(Name, ExplanationFormat, Params) ->
@@ -276,8 +274,7 @@ val({Type, Value}) ->
true -> "~s";
false -> "~w"
end,
- lists:flatten(io_lib:format("the value '" ++ ValFmt ++ "' of type '~s'",
- [Value, Type])).
+ format("the value '" ++ ValFmt ++ "' of type '~s'", [Value, Type]).
%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
%% expensive due to general mnesia overheads (figuring out table types
@@ -320,8 +317,7 @@ r_arg(VHostPath, Kind, Table, Key) ->
end.
rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) ->
- lists:flatten(io_lib:format("~s '~s' in vhost '~s'",
- [Kind, Name, VHostPath])).
+ format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath]).
enable_cover() -> enable_cover(["."]).
@@ -337,7 +333,7 @@ enable_cover(Dirs) ->
end, ok, Dirs).
start_cover(NodesS) ->
- {ok, _} = cover:start([makenode(N) || N <- NodesS]),
+ {ok, _} = cover:start([rabbit_nodes:make(N) || N <- NodesS]),
ok.
report_cover() -> report_cover(["."]).
@@ -418,12 +414,25 @@ execute_mnesia_transaction(TxFun) ->
%% Making this a sync_transaction allows us to use dirty_read
%% elsewhere and get a consistent result even when that read
%% executes on a different node.
- case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of
- {atomic, Result} -> Result;
- {aborted, Reason} -> throw({error, Reason})
+ case worker_pool:submit(
+ fun () ->
+ case mnesia:is_transaction() of
+ false -> DiskLogBefore = mnesia_dumper:get_log_writes(),
+ Res = mnesia:sync_transaction(TxFun),
+ DiskLogAfter = mnesia_dumper:get_log_writes(),
+ case DiskLogAfter == DiskLogBefore of
+ true -> Res;
+ false -> {sync, Res}
+ end;
+ true -> mnesia:sync_transaction(TxFun)
+ end
+ end) of
+ {sync, {atomic, Result}} -> mnesia_sync:sync(), Result;
+ {sync, {aborted, Reason}} -> throw({error, Reason});
+ {atomic, Result} -> Result;
+ {aborted, Reason} -> throw({error, Reason})
end.
-
%% Like execute_mnesia_transaction/1 with additional Pre- and Post-
%% commit function
execute_mnesia_transaction(TxFun, PrePostCommitFun) ->
@@ -450,29 +459,10 @@ execute_mnesia_tx_with_tail(TxFun) ->
ensure_ok(ok, _) -> ok;
ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}).
-makenode({Prefix, Suffix}) ->
- list_to_atom(lists:append([Prefix, "@", Suffix]));
-makenode(NodeStr) ->
- makenode(nodeparts(NodeStr)).
-
-nodeparts(Node) when is_atom(Node) ->
- nodeparts(atom_to_list(Node));
-nodeparts(NodeStr) ->
- case lists:splitwith(fun (E) -> E =/= $@ end, NodeStr) of
- {Prefix, []} -> {_, Suffix} = nodeparts(node()),
- {Prefix, Suffix};
- {Prefix, Suffix} -> {Prefix, tl(Suffix)}
- end.
-
-cookie_hash() ->
- base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))).
-
tcp_name(Prefix, IPAddress, Port)
when is_atom(Prefix) andalso is_number(Port) ->
list_to_atom(
- lists:flatten(
- io_lib:format("~w_~s:~w",
- [Prefix, inet_parse:ntoa(IPAddress), Port]))).
+ format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port])).
%% This is a modified version of Luke Gorrie's pmap -
%% http://lukego.livejournal.com/6753.html - that doesn't care about
@@ -541,6 +531,8 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) ->
io:format("Bad Chunk, ~p: ~p~n", [BadBytes, Terms]),
dirty_dump_log1(LH, disk_log:chunk(LH, K)).
+format(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)).
+
format_stderr(Fmt, Args) ->
case os:type() of
{unix, _} ->
@@ -636,7 +628,7 @@ pid_to_string(Pid) when is_pid(Pid) ->
<<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>>
= term_to_binary(Pid),
Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
- lists:flatten(io_lib:format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser])).
+ format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser]).
%% inverse of above
string_to_pid(Str) ->
@@ -728,13 +720,14 @@ gb_trees_foreach(Fun, Tree) ->
%% [{"-q",true},{"-p","/"}]}
get_options(Defs, As) ->
lists:foldl(fun(Def, {AsIn, RsIn}) ->
- {AsOut, Value} = case Def of
- {flag, Key} ->
- get_flag(Key, AsIn);
- {option, Key, Default} ->
- get_option(Key, Default, AsIn)
- end,
- {AsOut, [{Key, Value} | RsIn]}
+ {K, {AsOut, V}} =
+ case Def of
+ {flag, Key} ->
+ {Key, get_flag(Key, AsIn)};
+ {option, Key, Default} ->
+ {Key, get_option(Key, Default, AsIn)}
+ end,
+ {AsOut, [{K, V} | RsIn]}
end, {As, []}, Defs).
get_option(K, _Default, [K, V | As]) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index c8c18843a8..4d419fd9a9 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
@@ -23,8 +23,8 @@
empty_ram_only_tables/0, copy_db/1, wait_for_tables/1,
create_cluster_nodes_config/1, read_cluster_nodes_config/0,
record_running_nodes/0, read_previously_running_nodes/0,
- delete_previously_running_nodes/0, running_nodes_filename/0,
- is_disc_node/0, on_node_down/1, on_node_up/1]).
+ running_nodes_filename/0, is_disc_node/0, on_node_down/1,
+ on_node_up/1]).
-export([table_names/0]).
@@ -64,7 +64,6 @@
-spec(read_cluster_nodes_config/0 :: () -> [node()]).
-spec(record_running_nodes/0 :: () -> 'ok').
-spec(read_previously_running_nodes/0 :: () -> [node()]).
--spec(delete_previously_running_nodes/0 :: () -> 'ok').
-spec(running_nodes_filename/0 :: () -> file:filename()).
-spec(is_disc_node/0 :: () -> boolean()).
-spec(on_node_up/1 :: (node()) -> 'ok').
@@ -98,12 +97,13 @@ status() ->
init() ->
ensure_mnesia_running(),
ensure_mnesia_dir(),
- ok = init_db(read_cluster_nodes_config(), true,
- fun maybe_upgrade_local_or_record_desired/0),
+ Nodes = read_cluster_nodes_config(),
+ ok = init_db(Nodes, should_be_disc_node(Nodes)),
%% We intuitively expect the global name server to be synced when
%% Mnesia is up. In fact that's not guaranteed to be the case - let's
%% make it so.
ok = global:sync(),
+ ok = delete_previously_running_nodes(),
ok.
is_db_empty() ->
@@ -174,8 +174,7 @@ cluster(ClusterNodes, Force) ->
%% Join the cluster
start_mnesia(),
try
- ok = init_db(ClusterNodes, Force,
- fun maybe_upgrade_local_or_record_desired/0),
+ ok = init_db(ClusterNodes, Force),
ok = create_cluster_nodes_config(ClusterNodes)
after
stop_mnesia()
@@ -268,6 +267,11 @@ table_definitions() ->
{type, ordered_set},
{match, #reverse_route{reverse_binding = reverse_binding_match(),
_='_'}}]},
+ {rabbit_topic_trie_node,
+ [{record_name, topic_trie_node},
+ {attributes, record_info(fields, topic_trie_node)},
+ {type, ordered_set},
+ {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]},
{rabbit_topic_trie_edge,
[{record_name, topic_trie_edge},
{attributes, record_info(fields, topic_trie_edge)},
@@ -314,12 +318,12 @@ reverse_binding_match() ->
_='_'}.
binding_destination_match() ->
resource_match('_').
+trie_node_match() ->
+ #trie_node{ exchange_name = exchange_name_match(), _='_'}.
trie_edge_match() ->
- #trie_edge{exchange_name = exchange_name_match(),
- _='_'}.
+ #trie_edge{ exchange_name = exchange_name_match(), _='_'}.
trie_binding_match() ->
- #trie_binding{exchange_name = exchange_name_match(),
- _='_'}.
+ #trie_binding{exchange_name = exchange_name_match(), _='_'}.
exchange_name_match() ->
resource_match(exchange).
queue_name_match() ->
@@ -496,6 +500,18 @@ delete_previously_running_nodes() ->
FileName, Reason}})
end.
+init_db(ClusterNodes, Force) ->
+ init_db(
+ ClusterNodes, Force,
+ fun () ->
+ case rabbit_upgrade:maybe_upgrade_local() of
+ ok -> ok;
+ %% If we're just starting up a new node we won't have a
+ %% version
+ version_not_available -> ok = rabbit_version:record_desired()
+ end
+ end).
+
%% Take a cluster node config and create the right kind of node - a
%% standalone disk node, or disk or ram node connected to the
%% specified cluster nodes. If Force is false, don't allow
@@ -504,20 +520,12 @@ init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) ->
UClusterNodes = lists:usort(ClusterNodes),
ProperClusterNodes = UClusterNodes -- [node()],
case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
+ {ok, []} when not Force andalso ProperClusterNodes =/= [] ->
+ throw({error, {failed_to_cluster_with, ProperClusterNodes,
+ "Mnesia could not connect to any disc nodes."}});
{ok, Nodes} ->
- case Force of
- false -> FailedClusterNodes = ProperClusterNodes -- Nodes,
- case FailedClusterNodes of
- [] -> ok;
- _ -> throw({error, {failed_to_cluster_with,
- FailedClusterNodes,
- "Mnesia could not connect "
- "to some nodes."}})
- end;
- true -> ok
- end,
- WantDiscNode = should_be_disc_node(ClusterNodes),
WasDiscNode = is_disc_node(),
+ WantDiscNode = should_be_disc_node(ClusterNodes),
%% We create a new db (on disk, or in ram) in the first
%% two cases and attempt to upgrade the in the other two
case {Nodes, WasDiscNode, WantDiscNode} of
@@ -567,14 +575,6 @@ init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) ->
throw({error, {unable_to_join_cluster, ClusterNodes, Reason}})
end.
-maybe_upgrade_local_or_record_desired() ->
- case rabbit_upgrade:maybe_upgrade_local() of
- ok -> ok;
- %% If we're just starting up a new node we won't have a
- %% version
- version_not_available -> ok = rabbit_version:record_desired()
- end.
-
schema_ok_or_move() ->
case check_schema_integrity() of
ok ->
@@ -622,10 +622,9 @@ move_db() ->
stop_mnesia(),
MnesiaDir = filename:dirname(dir() ++ "/"),
{{Year, Month, Day}, {Hour, Minute, Second}} = erlang:universaltime(),
- BackupDir = lists:flatten(
- io_lib:format("~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w",
- [MnesiaDir,
- Year, Month, Day, Hour, Minute, Second])),
+ BackupDir = rabbit_misc:format(
+ "~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w",
+ [MnesiaDir, Year, Month, Day, Hour, Minute, Second]),
case file:rename(MnesiaDir, BackupDir) of
ok ->
%% NB: we cannot use rabbit_log here since it may not have
@@ -733,16 +732,18 @@ reset(Force) ->
false -> ok
end,
Node = node(),
+ Nodes = all_clustered_nodes() -- [Node],
case Force of
true -> ok;
false ->
ensure_mnesia_dir(),
start_mnesia(),
- {Nodes, RunningNodes} =
+ RunningNodes =
try
- ok = init(),
- {all_clustered_nodes() -- [Node],
- running_clustered_nodes() -- [Node]}
+ %% Force=true here so that reset still works when clustered
+ %% with a node which is down
+ ok = init_db(read_cluster_nodes_config(), true),
+ running_clustered_nodes() -- [Node]
after
stop_mnesia()
end,
@@ -750,6 +751,10 @@ reset(Force) ->
rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
cannot_delete_schema)
end,
+ %% We need to make sure that we don't end up in a distributed
+ %% Erlang system with nodes while not being in an Mnesia cluster
+ %% with them. We don't handle that well.
+ [erlang:disconnect_node(N) || N <- Nodes],
ok = delete_cluster_nodes_config(),
%% remove persisted messages and any other garbage we find
ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index b7de27d4b5..f685b109a9 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_file).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e6a32b9023..56265136dd 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store).
@@ -21,7 +21,7 @@
-export([start_link/4, successfully_recovered_state/1,
client_init/4, client_terminate/1, client_delete_and_terminate/1,
client_ref/1, close_all_indicated/1,
- write/3, read/2, contains/2, remove/2]).
+ write/3, write_flow/3, read/2, contains/2, remove/2]).
-export([set_maximum_since_use/2, has_readers/2, combine_files/3,
delete_file/2]). %% internal
@@ -152,6 +152,7 @@
-spec(close_all_indicated/1 ::
(client_msstate()) -> rabbit_types:ok(client_msstate())).
-spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
+-spec(write_flow/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
-spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) ->
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
-spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()).
@@ -436,7 +437,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} =
gen_server2:call(
- Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
+ Server, {new_client_state, Ref, self(), MsgOnDiskFun, CloseFDsFun},
+ infinity),
#client_msstate { server = Server,
client_ref = Ref,
file_handle_cache = dict:new(),
@@ -460,12 +462,11 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
-write(MsgId, Msg,
- CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
- client_ref = CRef }) ->
- ok = client_update_flying(+1, MsgId, CState),
- ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
- ok = server_cast(CState, {write, CRef, MsgId}).
+write_flow(MsgId, Msg, CState = #client_msstate { server = Server }) ->
+ credit_flow:send(whereis(Server), ?CREDIT_DISC_BOUND),
+ client_write(MsgId, Msg, flow, CState).
+
+write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
read(MsgId,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
@@ -500,6 +501,13 @@ server_call(#client_msstate { server = Server }, Msg) ->
server_cast(#client_msstate { server = Server }, Msg) ->
gen_server2:cast(Server, Msg).
+client_write(MsgId, Msg, Flow,
+ CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
+ client_ref = CRef }) ->
+ ok = client_update_flying(+1, MsgId, CState),
+ ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
+ ok = server_cast(CState, {write, CRef, MsgId, Flow}).
+
client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer,
CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
case ets:lookup(FileSummaryEts, File) of
@@ -666,7 +674,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
ClientRefs, Dir, Server),
Clients = dict:from_list(
- [{CRef, {undefined, undefined}} || CRef <- ClientRefs1]),
+ [{CRef, {undefined, undefined, undefined}} ||
+ CRef <- ClientRefs1]),
%% CleanShutdown => msg location index and file_summary both
%% recovered correctly.
true = case {FileSummaryRecovered, CleanShutdown} of
@@ -731,10 +740,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- successfully_recovered_state -> 7;
- {new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7;
- {read, _MsgId} -> 2;
- _ -> 0
+ successfully_recovered_state -> 7;
+ {new_client_state, _Ref, _Pid, _MODC, _CloseFDsFun} -> 7;
+ {read, _MsgId} -> 2;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
@@ -755,7 +764,7 @@ prioritise_info(Msg, _State) ->
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
-handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
+handle_call({new_client_state, CRef, CPid, MsgOnDiskFun, CloseFDsFun}, _From,
State = #msstate { dir = Dir,
index_state = IndexState,
index_module = IndexModule,
@@ -765,7 +774,7 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
flying_ets = FlyingEts,
clients = Clients,
gc_pid = GCPid }) ->
- Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
+ Clients1 = dict:store(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients),
reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
CurFileCacheEts, FlyingEts},
State #msstate { clients = Clients1 });
@@ -789,11 +798,19 @@ handle_cast({client_dying, CRef},
handle_cast({client_delete, CRef},
State = #msstate { clients = Clients }) ->
+ {CPid, _, _} = dict:fetch(CRef, Clients),
+ credit_flow:peer_down(CPid),
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
-handle_cast({write, CRef, MsgId},
- State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+handle_cast({write, CRef, MsgId, Flow},
+ State = #msstate { cur_file_cache_ets = CurFileCacheEts,
+ clients = Clients }) ->
+ case Flow of
+ flow -> {CPid, _, _} = dict:fetch(CRef, Clients),
+ credit_flow:ack(CPid, ?CREDIT_DISC_BOUND);
+ noflow -> ok
+ end,
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
case update_flying(-1, MsgId, CRef, State) of
process ->
@@ -1204,10 +1221,10 @@ update_pending_confirms(Fun, CRef,
State = #msstate { clients = Clients,
cref_to_msg_ids = CTM }) ->
case dict:fetch(CRef, Clients) of
- {undefined, _CloseFDsFun} -> State;
- {MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM),
- State #msstate {
- cref_to_msg_ids = CTM1 }
+ {_CPid, undefined, _CloseFDsFun} -> State;
+ {_CPid, MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM),
+ State #msstate {
+ cref_to_msg_ids = CTM1 }
end.
record_pending_confirm(CRef, MsgId, State) ->
@@ -1294,8 +1311,10 @@ mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) ->
case (ets:update_element(FileHandlesEts, Key, {2, close})
andalso Invoke) of
true -> case dict:fetch(Ref, ClientRefs) of
- {_MsgOnDiskFun, undefined} -> ok;
- {_MsgOnDiskFun, CloseFDsFun} -> ok = CloseFDsFun()
+ {_CPid, _MsgOnDiskFun, undefined} ->
+ ok;
+ {_CPid, _MsgOnDiskFun, CloseFDsFun} ->
+ ok = CloseFDsFun()
end;
false -> ok
end
diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl
index d6dc556878..9c31439f51 100644
--- a/src/rabbit_msg_store_ets_index.erl
+++ b/src/rabbit_msg_store_ets_index.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store_ets_index).
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 77f1f04ee2..3b61ed0bd7 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store_gc).
diff --git a/src/rabbit_msg_store_index.erl b/src/rabbit_msg_store_index.erl
index ef8b7cdfed..2f36256c61 100644
--- a/src/rabbit_msg_store_index.erl
+++ b/src/rabbit_msg_store_index.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store_index).
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index b944ec81a1..02889b93a3 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_net).
@@ -19,7 +19,7 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1,
- sockname/1, peername/1, peercert/1]).
+ sockname/1, peername/1, peercert/1, connection_string/2]).
%%---------------------------------------------------------------------------
@@ -62,6 +62,8 @@
-spec(peercert/1 ::
(socket())
-> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())).
+-spec(connection_string/2 ::
+ (socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())).
-endif.
@@ -141,3 +143,19 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock).
peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl);
peercert(Sock) when is_port(Sock) -> nossl.
+
+connection_string(Sock, Direction) ->
+ {From, To} = case Direction of
+ inbound -> {fun peername/1, fun sockname/1};
+ outbound -> {fun sockname/1, fun peername/1}
+ end,
+ case {From(Sock), To(Sock)} of
+ {{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} ->
+ {ok, rabbit_misc:format("~s:~p -> ~s:~p",
+ [rabbit_misc:ntoab(FromAddress), FromPort,
+ rabbit_misc:ntoab(ToAddress), ToPort])};
+ {{error, _Reason} = Error, _} ->
+ Error;
+ {_, {error, _Reason} = Error} ->
+ Error
+ end.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 045ab89a70..825d1bb1e0 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_networking).
@@ -24,7 +24,7 @@
close_connection/2, force_connection_event_refresh/0]).
%%used by TCP-based transports, e.g. STOMP adapter
--export([check_tcp_listener_address/2,
+-export([tcp_listener_addresses/1, tcp_listener_spec/6,
ensure_ssl/0, ssl_transform_fun/1]).
-export([tcp_listener_started/3, tcp_listener_stopped/3,
@@ -47,12 +47,16 @@
-export_type([ip_port/0, hostname/0]).
-type(hostname() :: inet:hostname()).
--type(ip_port() :: inet:ip_port()).
+-type(ip_port() :: inet:port_number()).
-type(family() :: atom()).
-type(listener_config() :: ip_port() |
{hostname(), ip_port()} |
{hostname(), ip_port(), family()}).
+-type(address() :: {inet:ip_address(), ip_port(), family()}).
+-type(name_prefix() :: atom()).
+-type(protocol() :: atom()).
+-type(label() :: string()).
-spec(start/0 :: () -> 'ok').
-spec(start_tcp_listener/1 :: (listener_config()) -> 'ok').
@@ -76,8 +80,10 @@
-spec(force_connection_event_refresh/0 :: () -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
--spec(check_tcp_listener_address/2 :: (atom(), listener_config())
- -> [{inet:ip_address(), ip_port(), family(), atom()}]).
+-spec(tcp_listener_addresses/1 :: (listener_config()) -> [address()]).
+-spec(tcp_listener_spec/6 ::
+ (name_prefix(), address(), [gen_tcp:listen_option()], protocol(),
+ label(), rabbit_types:mfargs()) -> supervisor:child_spec()).
-spec(ensure_ssl/0 :: () -> rabbit_types:infos()).
-spec(ssl_transform_fun/1 ::
(rabbit_types:infos())
@@ -140,39 +146,6 @@ start() ->
transient, infinity, supervisor, [rabbit_client_sup]}),
ok.
-%% inet_parse:address takes care of ip string, like "0.0.0.0"
-%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
-%% and runs 'inet_gethost' port process for dns lookups.
-%% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
-
-getaddr(Host, Family) ->
- case inet_parse:address(Host) of
- {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}];
- {error, _} -> gethostaddr(Host, Family)
- end.
-
-gethostaddr(Host, auto) ->
- Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]],
- case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of
- [] -> host_lookup_error(Host, Lookups);
- IPs -> IPs
- end;
-
-gethostaddr(Host, Family) ->
- case inet:getaddr(Host, Family) of
- {ok, IPAddress} -> [{IPAddress, Family}];
- {error, Reason} -> host_lookup_error(Host, Reason)
- end.
-
-host_lookup_error(Host, Reason) ->
- error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]),
- throw({error, {invalid_host, Host, Reason}}).
-
-resolve_family({_,_,_,_}, auto) -> inet;
-resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
-resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
-resolve_family(_, F) -> F.
-
ensure_ssl() ->
ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
{ok, SslOptsConfig} = application:get_env(rabbit, ssl_options),
@@ -191,8 +164,6 @@ ssl_transform_fun(SslOpts) ->
fun (Sock) ->
case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of
{ok, SslSock} ->
- rabbit_log:info("upgraded TCP connection ~p to SSL~n",
- [self()]),
{ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
{error, Reason} ->
{error, {ssl_upgrade_error, Reason}};
@@ -201,31 +172,36 @@ ssl_transform_fun(SslOpts) ->
end
end.
-check_tcp_listener_address(NamePrefix, Port) when is_integer(Port) ->
- check_tcp_listener_address_auto(NamePrefix, Port);
-
-check_tcp_listener_address(NamePrefix, {"auto", Port}) ->
+tcp_listener_addresses(Port) when is_integer(Port) ->
+ tcp_listener_addresses_auto(Port);
+tcp_listener_addresses({"auto", Port}) ->
%% Variant to prevent lots of hacking around in bash and batch files
- check_tcp_listener_address_auto(NamePrefix, Port);
-
-check_tcp_listener_address(NamePrefix, {Host, Port}) ->
+ tcp_listener_addresses_auto(Port);
+tcp_listener_addresses({Host, Port}) ->
%% auto: determine family IPv4 / IPv6 after converting to IP address
- check_tcp_listener_address(NamePrefix, {Host, Port, auto});
-
-check_tcp_listener_address(NamePrefix, {Host, Port, Family0}) ->
- if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok;
- true -> error_logger:error_msg("invalid port ~p - not 0..65535~n",
- [Port]),
- throw({error, {invalid_port, Port}})
- end,
- [{IPAddress, Port, Family,
- rabbit_misc:tcp_name(NamePrefix, IPAddress, Port)} ||
- {IPAddress, Family} <- getaddr(Host, Family0)].
-
-check_tcp_listener_address_auto(NamePrefix, Port) ->
- lists:append([check_tcp_listener_address(NamePrefix, Listener) ||
+ tcp_listener_addresses({Host, Port, auto});
+tcp_listener_addresses({Host, Port, Family0})
+ when is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) ->
+ [{IPAddress, Port, Family} ||
+ {IPAddress, Family} <- getaddr(Host, Family0)];
+tcp_listener_addresses({_Host, Port, _Family0}) ->
+ error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]),
+ throw({error, {invalid_port, Port}}).
+
+tcp_listener_addresses_auto(Port) ->
+ lists:append([tcp_listener_addresses(Listener) ||
Listener <- port_to_listeners(Port)]).
+tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts,
+ Protocol, Label, OnConnect) ->
+ {rabbit_misc:tcp_name(NamePrefix, IPAddress, Port),
+ {tcp_listener_sup, start_link,
+ [IPAddress, Port, [Family | SocketOpts],
+ {?MODULE, tcp_listener_started, [Protocol]},
+ {?MODULE, tcp_listener_stopped, [Protocol]},
+ OnConnect, Label]},
+ transient, infinity, supervisor, [tcp_listener_sup]}.
+
start_tcp_listener(Listener) ->
start_listener(Listener, amqp, "TCP Listener",
{?MODULE, start_client, []}).
@@ -235,27 +211,26 @@ start_ssl_listener(Listener, SslOpts) ->
{?MODULE, start_ssl_client, [SslOpts]}).
start_listener(Listener, Protocol, Label, OnConnect) ->
- [start_listener0(Spec, Protocol, Label, OnConnect) ||
- Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)],
+ [start_listener0(Address, Protocol, Label, OnConnect) ||
+ Address <- tcp_listener_addresses(Listener)],
ok.
-start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) ->
- {ok,_} = supervisor:start_child(
- rabbit_sup,
- {Name,
- {tcp_listener_sup, start_link,
- [IPAddress, Port, [Family | tcp_opts()],
- {?MODULE, tcp_listener_started, [Protocol]},
- {?MODULE, tcp_listener_stopped, [Protocol]},
- OnConnect, Label]},
- transient, infinity, supervisor, [tcp_listener_sup]}).
+start_listener0(Address, Protocol, Label, OnConnect) ->
+ Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(),
+ Protocol, Label, OnConnect),
+ case supervisor:start_child(rabbit_sup, Spec) of
+ {ok, _} -> ok;
+ {error, {shutdown, _}} -> {IPAddress, Port, _Family} = Address,
+ exit({could_not_start_tcp_listener,
+ {rabbit_misc:ntoa(IPAddress), Port}})
+ end.
stop_tcp_listener(Listener) ->
- [stop_tcp_listener0(Spec) ||
- Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)],
+ [stop_tcp_listener0(Address) ||
+ Address <- tcp_listener_addresses(Listener)],
ok.
-stop_tcp_listener0({IPAddress, Port, _Family, Name}) ->
+stop_tcp_listener0({IPAddress, Port, _Family}) ->
Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port),
ok = supervisor:terminate_child(rabbit_sup, Name),
ok = supervisor:delete_child(rabbit_sup, Name).
@@ -294,6 +269,16 @@ start_client(Sock, SockTransform) ->
{ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []),
ok = rabbit_net:controlling_process(Sock, Reader),
Reader ! {go, Sock, SockTransform},
+
+ %% In the event that somebody floods us with connections, the
+ %% reader processes can spew log events at error_logger faster
+ %% than it can keep up, causing its mailbox to grow unbounded
+ %% until we eat all the memory available and crash. So here is a
+ %% meaningless synchronous call to the underlying gen_event
+ %% mechanism. When it returns the mailbox is drained, and we
+ %% return to our caller to accept more connetions.
+ gen_event:which_handlers(error_logger),
+
Reader.
start_client(Sock) ->
@@ -363,6 +348,38 @@ tcp_opts() ->
{ok, Opts} = application:get_env(rabbit, tcp_listen_options),
Opts.
+%% inet_parse:address takes care of ip string, like "0.0.0.0"
+%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
+%% and runs 'inet_gethost' port process for dns lookups.
+%% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
+getaddr(Host, Family) ->
+ case inet_parse:address(Host) of
+ {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}];
+ {error, _} -> gethostaddr(Host, Family)
+ end.
+
+gethostaddr(Host, auto) ->
+ Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]],
+ case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of
+ [] -> host_lookup_error(Host, Lookups);
+ IPs -> IPs
+ end;
+
+gethostaddr(Host, Family) ->
+ case inet:getaddr(Host, Family) of
+ {ok, IPAddress} -> [{IPAddress, Family}];
+ {error, Reason} -> host_lookup_error(Host, Reason)
+ end.
+
+host_lookup_error(Host, Reason) ->
+ error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]),
+ throw({error, {invalid_host, Host, Reason}}).
+
+resolve_family({_,_,_,_}, auto) -> inet;
+resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
+resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
+resolve_family(_, F) -> F.
+
%%--------------------------------------------------------------------
%% There are three kinds of machine (for our purposes).
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 8aa24ab53e..323cf0ce9e 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_node_monitor).
@@ -55,29 +55,32 @@ notify_cluster() ->
{_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
end,
%% register other active rabbits with this rabbit
- [ rabbit_node_monitor:rabbit_running_on(N) || N <- Nodes ],
+ [ rabbit_running_on(N) || N <- Nodes ],
ok.
%%--------------------------------------------------------------------
init([]) ->
- {ok, no_state}.
+ {ok, ordsets:new()}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({rabbit_running_on, Node}, State) ->
- rabbit_log:info("rabbit on ~p up~n", [Node]),
- erlang:monitor(process, {rabbit, Node}),
- ok = handle_live_rabbit(Node),
- {noreply, State};
+handle_cast({rabbit_running_on, Node}, Nodes) ->
+ case ordsets:is_element(Node, Nodes) of
+ true -> {noreply, Nodes};
+ false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
+ erlang:monitor(process, {rabbit, Node}),
+ ok = handle_live_rabbit(Node),
+ {noreply, ordsets:add_element(Node, Nodes)}
+ end;
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
- rabbit_log:info("node ~p lost 'rabbit'~n", [Node]),
+handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Nodes) ->
+ rabbit_log:info("rabbit on node ~p down~n", [Node]),
ok = handle_dead_rabbit(Node),
- {noreply, State};
+ {noreply, ordsets:del_element(Node, Nodes)};
handle_info(_Info, State) ->
{noreply, State}.
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
new file mode 100644
index 0000000000..329c07dc9f
--- /dev/null
+++ b/src/rabbit_nodes.erl
@@ -0,0 +1,94 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_nodes).
+
+-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0]).
+
+-define(EPMD_TIMEOUT, 30000).
+
+%%----------------------------------------------------------------------------
+%% Specs
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(names/1 :: (string()) -> rabbit_types:ok_or_error2(
+ [{string(), integer()}], term())).
+-spec(diagnostics/1 :: ([node()]) -> string()).
+-spec(make/1 :: ({string(), string()} | string()) -> node()).
+-spec(parts/1 :: (node() | string()) -> {string(), string()}).
+-spec(cookie_hash/0 :: () -> string()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+names(Hostname) ->
+ Self = self(),
+ process_flag(trap_exit, true),
+ Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end),
+ timer:exit_after(?EPMD_TIMEOUT, Pid, timeout),
+ Res = receive
+ {names, Names} -> Names;
+ {'EXIT', Pid, Reason} -> {error, Reason}
+ end,
+ process_flag(trap_exit, false),
+ Res.
+
+diagnostics(Nodes) ->
+ Hosts = lists:usort([element(2, parts(Node)) || Node <- Nodes]),
+ NodeDiags = [{"~nDIAGNOSTICS~n===========~n~n"
+ "nodes in question: ~p~n~n"
+ "hosts, their running nodes and ports:", [Nodes]}] ++
+ [diagnostics_host(Host) || Host <- Hosts] ++
+ diagnostics0(),
+ lists:flatten([io_lib:format(F ++ "~n", A) || NodeDiag <- NodeDiags,
+ {F, A} <- [NodeDiag]]).
+
+diagnostics0() ->
+ [{"~ncurrent node details:~n- node name: ~w", [node()]},
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> {"- home dir: ~s", [Home]};
+ Other -> {"- no home dir: ~p", [Other]}
+ end,
+ {"- cookie hash: ~s", [cookie_hash()]}].
+
+diagnostics_host(Host) ->
+ case names(Host) of
+ {error, EpmdReason} ->
+ {"- unable to connect to epmd on ~s: ~w",
+ [Host, EpmdReason]};
+ {ok, NamePorts} ->
+ {"- ~s: ~p",
+ [Host, [{list_to_atom(Name), Port} ||
+ {Name, Port} <- NamePorts]]}
+ end.
+
+make({Prefix, Suffix}) -> list_to_atom(lists:append([Prefix, "@", Suffix]));
+make(NodeStr) -> make(parts(NodeStr)).
+
+parts(Node) when is_atom(Node) ->
+ parts(atom_to_list(Node));
+parts(NodeStr) ->
+ case lists:splitwith(fun (E) -> E =/= $@ end, NodeStr) of
+ {Prefix, []} -> {_, Suffix} = parts(node()),
+ {Prefix, Suffix};
+ {Prefix, Suffix} -> {Prefix, tl(Suffix)}
+ end.
+
+cookie_hash() ->
+ base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))).
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 0862f1b2c7..7b85ab15fc 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_plugins).
@@ -55,13 +55,20 @@ start() ->
CmdArgsAndOpts -> CmdArgsAndOpts
end,
Command = list_to_atom(Command0),
+ PrintInvalidCommandError =
+ fun () ->
+ print_error("invalid command '~s'",
+ [string:join([atom_to_list(Command) | Args], " ")])
+ end,
case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of
ok ->
rabbit_misc:quit(0);
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
- print_error("invalid command '~s'",
- [string:join([atom_to_list(Command) | Args], " ")]),
+ PrintInvalidCommandError(),
+ usage();
+ {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} ->
+ PrintInvalidCommandError(),
usage();
{error, Reason} ->
print_error("~p", [Reason]),
@@ -111,8 +118,7 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
[] -> io:format("Plugin configuration unchanged.~n");
_ -> print_list("The following plugins have been enabled:",
NewImplicitlyEnabled -- ImplicitlyEnabled),
- io:format("Plugin configuration has changed. "
- "Restart RabbitMQ for changes to take effect.~n")
+ report_change()
end;
action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
@@ -140,8 +146,7 @@ action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
print_list("The following plugins have been disabled:",
ImplicitlyEnabled -- NewImplicitlyEnabled),
write_enabled_plugins(PluginsFile, NewEnabled),
- io:format("Plugin configuration has changed. "
- "Restart RabbitMQ for changes to take effect.~n")
+ report_change()
end.
%%----------------------------------------------------------------------------
@@ -327,6 +332,9 @@ lookup_plugins(Names, AllPlugins) ->
read_enabled_plugins(PluginsFile) ->
case rabbit_file:read_term_file(PluginsFile) of
{ok, [Plugins]} -> Plugins;
+ {ok, []} -> [];
+ {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file,
+ PluginsFile}});
{error, enoent} -> [];
{error, Reason} -> throw({error, {cannot_read_enabled_plugins_file,
PluginsFile, Reason}})
@@ -374,3 +382,17 @@ maybe_warn_mochiweb(Enabled) ->
false ->
ok
end.
+
+report_change() ->
+ io:format("Plugin configuration has changed. "
+ "Restart RabbitMQ for changes to take effect.~n"),
+ case os:type() of
+ {win32, _OsName} ->
+ io:format("If you have RabbitMQ running as a service then you must"
+ " reinstall by running~n rabbitmq-service.bat stop~n"
+ " rabbitmq-service.bat install~n"
+ " rabbitmq-service.bat start~n~n");
+ _ ->
+ ok
+ end.
+
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 50444dc49d..162d44f13e 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_prelaunch).
@@ -22,7 +22,6 @@
-define(BaseApps, [rabbit]).
-define(ERROR_CODE, 1).
--define(EPMD_TIMEOUT, 30000).
%%----------------------------------------------------------------------------
%% Specs
@@ -244,16 +243,15 @@ duplicate_node_check([]) ->
%% Ignore running node while installing windows service
ok;
duplicate_node_check(NodeStr) ->
- Node = rabbit_misc:makenode(NodeStr),
- {NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
- case names(NodeHost) of
+ Node = rabbit_nodes:make(NodeStr),
+ {NodeName, NodeHost} = rabbit_nodes:parts(Node),
+ case rabbit_nodes:names(NodeHost) of
{ok, NamePorts} ->
case proplists:is_defined(NodeName, NamePorts) of
true -> io:format("node with name ~p "
"already running on ~p~n",
[NodeName, NodeHost]),
- [io:format(Fmt ++ "~n", Args) ||
- {Fmt, Args} <- rabbit_control:diagnostics(Node)],
+ io:format(rabbit_nodes:diagnostics([Node]) ++ "~n"),
terminate(?ERROR_CODE);
false -> ok
end;
@@ -279,15 +277,3 @@ terminate(Status) ->
after infinity -> ok
end
end.
-
-names(Hostname) ->
- Self = self(),
- process_flag(trap_exit, true),
- Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end),
- timer:exit_after(?EPMD_TIMEOUT, Pid, timeout),
- Res = receive
- {names, Names} -> Names;
- {'EXIT', Pid, Reason} -> {error, Reason}
- end,
- process_flag(trap_exit, false),
- Res.
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 9b45e79869..df957d883c 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_queue_collector).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index f03c1d1c76..4c8793f1ad 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_queue_index).
@@ -491,7 +491,7 @@ recover_message(false, _, no_del, RelSeq, Segment) ->
queue_name_to_dir_name(Name = #resource { kind = queue }) ->
<<Num:128>> = erlang:md5(term_to_binary(Name)),
- lists:flatten(io_lib:format("~.36B", [Num])).
+ rabbit_misc:format("~.36B", [Num]).
queues_dir() ->
filename:join(rabbit_mnesia:dir(), "queues").
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 694abd9e49..01242e81b8 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_reader).
@@ -27,11 +27,9 @@
-export([conserve_memory/2, server_properties/1]).
--export([process_channel_frame/5]). %% used by erlang-client
-
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
--define(CLOSING_TIMEOUT, 1).
+-define(CLOSING_TIMEOUT, 30).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
-define(SILENT_CLOSE_DELAY, 3).
@@ -40,10 +38,12 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state}).
+ auth_mechanism, auth_state, conserve_memory,
+ last_blocked_by, last_blocked_at}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
- send_pend, state, channels]).
+ send_pend, state, last_blocked_by, last_blocked_age,
+ channels]).
-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl,
peer_cert_subject, peer_cert_issuer,
@@ -90,10 +90,6 @@
-spec(system_continue/3 :: (_,_,#v1{}) -> any()).
-spec(system_terminate/4 :: (_,_,_,_) -> none()).
--spec(process_channel_frame/5 ::
- (rabbit_command_assembler:frame(), pid(), non_neg_integer(), pid(),
- tuple()) -> tuple()).
-
-endif.
%%--------------------------------------------------------------------------
@@ -177,25 +173,26 @@ server_capabilities(rabbit_framing_amqp_0_9_1) ->
server_capabilities(_) ->
[].
+log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
+
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
socket_op(Sock, Fun) ->
case Fun(Sock) of
{ok, Res} -> Res;
- {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n",
- [self(), Reason]),
- rabbit_log:info("closing TCP connection ~p~n",
- [self()]),
+ {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n",
+ [self(), Reason]),
exit(normal)
end.
start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
- {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
- PeerAddressS = rabbit_misc:ntoab(PeerAddress),
- rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
- [self(), PeerAddressS, PeerPort]),
+ ConnStr = socket_op(Sock, fun (Sock0) ->
+ rabbit_net:connection_string(
+ Sock0, inbound)
+ end),
+ log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]),
ClientSock = socket_op(Sock, SockTransform),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
@@ -220,21 +217,22 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
buf = [],
buf_len = 0,
auth_mechanism = none,
- auth_state = none},
+ auth_state = none,
+ conserve_memory = false,
+ last_blocked_by = none,
+ last_blocked_at = never},
try
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
- handshake, 8))
+ handshake, 8)),
+ log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr])
catch
- Ex -> (if Ex == connection_closed_abruptly ->
- fun rabbit_log:warning/2;
- true ->
- fun rabbit_log:error/2
- end)("exception on TCP connection ~p from ~s:~p~n~p~n",
- [self(), PeerAddressS, PeerPort, Ex])
+ Ex -> log(case Ex of
+ connection_closed_abruptly -> warning;
+ _ -> error
+ end, "closing AMQP connection ~p (~s):~n~p~n",
+ [self(), ConnStr, Ex])
after
- rabbit_log:info("closing TCP connection ~p from ~s:~p~n",
- [self(), PeerAddressS, PeerPort]),
%% We don't close the socket explicitly. The reader is the
%% controlling process and hence its termination will close
%% the socket. Furthermore, gen_tcp:close/1 waits for pending
@@ -267,21 +265,20 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
{data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf],
buf_len = BufLen + size(Data),
pending_recv = false});
- closed -> if State#v1.connection_state =:= closed ->
- State;
- true ->
- throw(connection_closed_abruptly)
+ closed -> case State#v1.connection_state of
+ closed -> State;
+ _ -> throw(connection_closed_abruptly)
end;
{error, Reason} -> throw({inet_error, Reason});
{other, Other} -> handle_other(Other, Deb, State)
end.
handle_other({conserve_memory, Conserve}, Deb, State) ->
- recvloop(Deb, internal_conserve_memory(Conserve, State));
+ recvloop(Deb, control_throttle(State#v1{conserve_memory = Conserve}));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
- mainloop(Deb, maybe_close(State));
+ mainloop(Deb, maybe_close(control_throttle(State)));
handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -329,22 +326,25 @@ handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
catch Error -> {error, Error}
end),
mainloop(Deb, State);
-handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
+handle_other({'$gen_cast', force_event_refresh}, Deb, State)
+ when ?IS_RUNNING(State) ->
rabbit_event:notify(connection_created,
[{type, network} | infos(?CREATION_EVENT_KEYS, State)]),
mainloop(Deb, State);
+handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
+ %% Ignore, we will emit a created event once we start running.
+ mainloop(Deb, State);
handle_other(emit_stats, Deb, State) ->
mainloop(Deb, emit_stats(State));
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
+handle_other({bump_credit, Msg}, Deb, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ recvloop(Deb, control_throttle(State));
handle_other(Other, _Deb, _State) ->
%% internal error -> something worth dying for
exit({unexpected_message, Other}).
-switch_callback(State = #v1{connection_state = blocked,
- heartbeater = Heartbeater}, Callback, Length) ->
- ok = rabbit_heartbeat:pause_monitor(Heartbeater),
- State#v1{callback = Callback, recv_len = Length};
switch_callback(State, Callback, Length) ->
State#v1{callback = Callback, recv_len = Length}.
@@ -355,17 +355,30 @@ terminate(Explanation, State) when ?IS_RUNNING(State) ->
terminate(_Explanation, State) ->
{force, State}.
-internal_conserve_memory(true, State = #v1{connection_state = running}) ->
- State#v1{connection_state = blocking};
-internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->
- State#v1{connection_state = running};
-internal_conserve_memory(false, State = #v1{connection_state = blocked,
- heartbeater = Heartbeater}) ->
- ok = rabbit_heartbeat:resume_monitor(Heartbeater),
- State#v1{connection_state = running};
-internal_conserve_memory(_Conserve, State) ->
+control_throttle(State = #v1{connection_state = CS,
+ conserve_memory = Mem}) ->
+ case {CS, Mem orelse credit_flow:blocked()} of
+ {running, true} -> State#v1{connection_state = blocking};
+ {blocking, false} -> State#v1{connection_state = running};
+ {blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
+ State#v1.heartbeater),
+ State#v1{connection_state = running};
+ {blocked, true} -> update_last_blocked_by(State);
+ {_, _} -> State
+ end.
+
+maybe_block(State = #v1{connection_state = blocking}) ->
+ ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
+ update_last_blocked_by(State#v1{connection_state = blocked,
+ last_blocked_at = erlang:now()});
+maybe_block(State) ->
State.
+update_last_blocked_by(State = #v1{conserve_memory = true}) ->
+ State#v1{last_blocked_by = mem};
+update_last_blocked_by(State = #v1{conserve_memory = false}) ->
+ State#v1{last_blocked_by = flow}.
+
close_connection(State = #v1{queue_collector = Collector,
connection = #connection{
timeout_sec = TimeoutSec}}) ->
@@ -376,34 +389,30 @@ close_connection(State = #v1{queue_collector = Collector,
rabbit_queue_collector:delete_all(Collector),
%% We terminate the connection after the specified interval, but
%% no later than ?CLOSING_TIMEOUT seconds.
- TimeoutMillisec =
- 1000 * if TimeoutSec > 0 andalso
- TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
- true -> ?CLOSING_TIMEOUT
- end,
- erlang:send_after(TimeoutMillisec, self(), terminate_connection),
+ erlang:send_after((if TimeoutSec > 0 andalso
+ TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
+ true -> ?CLOSING_TIMEOUT
+ end) * 1000, self(), terminate_connection),
State#v1{connection_state = closed}.
handle_dependent_exit(ChPid, Reason, State) ->
- case termination_kind(Reason) of
- controlled ->
- channel_cleanup(ChPid),
- maybe_close(State);
- uncontrolled ->
- case channel_cleanup(ChPid) of
- undefined -> exit({abnormal_dependent_exit, ChPid, Reason});
- Channel -> rabbit_log:error(
- "connection ~p, channel ~p - error:~n~p~n",
- [self(), Channel, Reason]),
- maybe_close(
- handle_exception(State, Channel, Reason))
- end
+ case {channel_cleanup(ChPid), termination_kind(Reason)} of
+ {undefined, uncontrolled} ->
+ exit({abnormal_dependent_exit, ChPid, Reason});
+ {_Channel, controlled} ->
+ maybe_close(control_throttle(State));
+ {Channel, uncontrolled} ->
+ log(error, "AMQP connection ~p, channel ~p - error:~n~p~n",
+ [self(), Channel, Reason]),
+ maybe_close(handle_exception(control_throttle(State),
+ Channel, Reason))
end.
channel_cleanup(ChPid) ->
case get({ch_pid, ChPid}) of
undefined -> undefined;
- {Channel, MRef} -> erase({channel, Channel}),
+ {Channel, MRef} -> credit_flow:peer_down(ChPid),
+ erase({channel, Channel}),
erase({ch_pid, ChPid}),
erlang:demonitor(MRef, [flush]),
Channel
@@ -432,19 +441,16 @@ wait_for_channel_termination(0, TimerRef) ->
wait_for_channel_termination(N, TimerRef) ->
receive
{'DOWN', _MRef, process, ChPid, Reason} ->
- case channel_cleanup(ChPid) of
- undefined ->
+ case {channel_cleanup(ChPid), termination_kind(Reason)} of
+ {undefined, _} ->
exit({abnormal_dependent_exit, ChPid, Reason});
- Channel ->
- case termination_kind(Reason) of
- controlled ->
- ok;
- uncontrolled ->
- rabbit_log:error(
- "connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
- [self(), Channel, Reason])
- end,
+ {_Channel, controlled} ->
+ wait_for_channel_termination(N-1, TimerRef);
+ {Channel, uncontrolled} ->
+ log(error,
+ "AMQP connection ~p, channel ~p - "
+ "error while terminating:~n~p~n",
+ [self(), Channel, Reason]),
wait_for_channel_termination(N-1, TimerRef)
end;
cancel_wait ->
@@ -493,41 +499,38 @@ handle_frame(Type, Channel, Payload,
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- AnalyzedFrame ->
- case get({channel, Channel}) of
- {ChPid, FramingState} ->
- NewAState = process_channel_frame(
- AnalyzedFrame, self(),
- Channel, ChPid, FramingState),
- put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(AnalyzedFrame, ChPid, State);
- undefined ->
- case ?IS_RUNNING(State) of
- true -> send_to_new_channel(
- Channel, AnalyzedFrame, State);
- false -> throw({channel_frame_while_starting,
- Channel, State#v1.connection_state,
- AnalyzedFrame})
- end
- end
+ AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, State)
+ end.
+
+process_frame(Frame, Channel, State) ->
+ case get({channel, Channel}) of
+ {ChPid, AState} ->
+ case process_channel_frame(Frame, ChPid, AState) of
+ {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {error, Reason} -> handle_exception(State, Channel, Reason)
+ end;
+ undefined when ?IS_RUNNING(State) ->
+ ok = create_channel(Channel, State),
+ process_frame(Frame, Channel, State);
+ undefined ->
+ throw({channel_frame_while_starting,
+ Channel, State#v1.connection_state, Frame})
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
channel_cleanup(ChPid),
- State;
+ control_throttle(State);
post_process_frame({method, MethodName, _}, _ChPid,
State = #v1{connection = #connection{
protocol = Protocol}}) ->
case Protocol:method_has_content(MethodName) of
true -> erlang:bump_reductions(2000),
- case State#v1.connection_state of
- blocking -> State#v1{connection_state = blocked};
- _ -> State
- end;
- false -> State
+ maybe_block(control_throttle(State));
+ false -> control_throttle(State)
end;
post_process_frame(_Frame, _ChPid, State) ->
- State.
+ control_throttle(State).
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
@@ -695,10 +698,11 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
- State1 = internal_conserve_memory(
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ State1 = control_throttle(
State#v1{connection_state = running,
- connection = NewConnection}),
+ connection = NewConnection,
+ conserve_memory = Conserve}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
@@ -830,6 +834,12 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
fun ([{_, I}]) -> I end);
i(state, #v1{connection_state = S}) ->
S;
+i(last_blocked_by, #v1{last_blocked_by = By}) ->
+ By;
+i(last_blocked_age, #v1{last_blocked_at = never}) ->
+ infinity;
+i(last_blocked_age, #v1{last_blocked_at = T}) ->
+ timer:now_diff(erlang:now(), T) / 1000000;
i(channels, #v1{}) ->
length(all_channels());
i(protocol, #v1{connection = #connection{protocol = none}}) ->
@@ -885,7 +895,7 @@ cert_info(F, Sock) ->
%%--------------------------------------------------------------------------
-send_to_new_channel(Channel, AnalyzedFrame, State) ->
+create_channel(Channel, State) ->
#v1{sock = Sock, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
connection = #connection{protocol = Protocol,
@@ -898,23 +908,19 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User,
VHost, Capabilities, Collector}),
MRef = erlang:monitor(process, ChPid),
- NewAState = process_channel_frame(AnalyzedFrame, self(),
- Channel, ChPid, AState),
- put({channel, Channel}, {ChPid, NewAState}),
put({ch_pid, ChPid}, {Channel, MRef}),
- State.
+ put({channel, Channel}, {ChPid, AState}),
+ ok.
-process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) ->
+process_channel_frame(Frame, ChPid, AState) ->
case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} -> NewAState;
+ {ok, NewAState} -> {ok, NewAState};
{ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
- NewAState;
- {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid,
- Method, Content),
- NewAState;
- {error, Reason} -> ErrPid ! {channel_exit, Channel,
- Reason},
- AState
+ {ok, NewAState};
+ {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
+ ChPid, Method, Content),
+ {ok, NewAState};
+ {error, Reason} -> {error, Reason}
end.
handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 9821ae7b86..8c0ebcbe94 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_registry).
diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl
index cda3ccbe0f..237ab78cac 100644
--- a/src/rabbit_restartable_sup.erl
+++ b/src/rabbit_restartable_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_restartable_sup).
@@ -28,7 +28,8 @@
-ifdef(use_specs).
--spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/2 :: (atom(), rabbit_types:mfargs()) ->
+ rabbit_types:ok_pid_or_error()).
-endif.
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 31f5ad14ea..f4bbda0f7a 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -11,28 +11,24 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_router).
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
--export([deliver/2, match_bindings/2, match_routing_key/2]).
+-export([match_bindings/2, match_routing_key/2]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([routing_key/0, routing_result/0, match_result/0]).
+-export_type([routing_key/0, match_result/0]).
-type(routing_key() :: binary()).
--type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
--type(qpids() :: [pid()]).
-type(match_result() :: [rabbit_types:binding_destination()]).
--spec(deliver/2 :: ([rabbit_amqqueue:name()], rabbit_types:delivery()) ->
- {routing_result(), qpids()}).
-spec(match_bindings/2 :: (rabbit_types:binding_source(),
fun ((rabbit_types:binding()) -> boolean())) ->
match_result()).
@@ -44,38 +40,6 @@
%%----------------------------------------------------------------------------
-deliver([], #delivery{mandatory = false,
- immediate = false}) ->
- %% /dev/null optimisation
- {routed, []};
-
-deliver(QNames, Delivery = #delivery{mandatory = false,
- immediate = false}) ->
- %% optimisation: when Mandatory = false and Immediate = false,
- %% rabbit_amqqueue:deliver will deliver the message to the queue
- %% process asynchronously, and return true, which means all the
- %% QPids will always be returned. It is therefore safe to use a
- %% fire-and-forget cast here and return the QPids - the semantics
- %% is preserved. This scales much better than the non-immediate
- %% case below.
- QPids = lookup_qpids(QNames),
- delegate:invoke_no_result(
- QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
- {routed, QPids};
-
-deliver(QNames, Delivery = #delivery{mandatory = Mandatory,
- immediate = Immediate}) ->
- QPids = lookup_qpids(QNames),
- {Success, _} =
- delegate:invoke(QPids,
- fun (Pid) ->
- rabbit_amqqueue:deliver(Pid, Delivery)
- end),
- {Routed, Handled} =
- lists:foldl(fun fold_deliveries/2, {false, []}, Success),
- check_delivery(Mandatory, Immediate, {Routed, Handled}).
-
-
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same source
match_bindings(SrcName, Match) ->
@@ -104,26 +68,6 @@ match_routing_key(SrcName, [_|_] = RoutingKeys) ->
%%--------------------------------------------------------------------
-fold_deliveries({Pid, true},{_, Handled}) -> {true, [Pid|Handled]};
-fold_deliveries({_, false},{_, Handled}) -> {true, Handled}.
-
-%% check_delivery(Mandatory, Immediate, {WasRouted, QPids})
-check_delivery(true, _ , {false, []}) -> {unroutable, []};
-check_delivery(_ , true, {_ , []}) -> {not_delivered, []};
-check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.
-
-%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
-%% expensive for the reasons explained in rabbit_misc:dirty_read/1.
-lookup_qpids(QNames) ->
- lists:foldl(fun (QName, QPids) ->
- case ets:lookup(rabbit_queue, QName) of
- [#amqqueue{pid = QPid, slave_pids = SPids}] ->
- [QPid | SPids ++ QPids];
- [] ->
- QPids
- end
- end, [], QNames).
-
%% Normally we'd call mnesia:dirty_select/2 here, but that is quite
%% expensive for the same reasons as above, and, additionally, due to
%% mnesia 'fixing' the table with ets:safe_fixtable/2, which is wholly
diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl
index 963294d924..e8beecfe0a 100644
--- a/src/rabbit_sasl_report_file_h.erl
+++ b/src/rabbit_sasl_report_file_h.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_sasl_report_file_h).
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index e0defa9e96..3025d981d4 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_ssl).
@@ -21,7 +21,7 @@
-include_lib("public_key/include/public_key.hrl").
-export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]).
--export([peer_cert_subject_item/2]).
+-export([peer_cert_subject_items/2]).
%%--------------------------------------------------------------------------
@@ -34,8 +34,8 @@
-spec(peer_cert_issuer/1 :: (certificate()) -> string()).
-spec(peer_cert_subject/1 :: (certificate()) -> string()).
-spec(peer_cert_validity/1 :: (certificate()) -> string()).
--spec(peer_cert_subject_item/2 ::
- (certificate(), tuple()) -> string() | 'not_found').
+-spec(peer_cert_subject_items/2 ::
+ (certificate(), tuple()) -> [string()] | 'not_found').
-endif.
@@ -59,8 +59,8 @@ peer_cert_subject(Cert) ->
format_rdn_sequence(Subject)
end, Cert).
-%% Return a part of the certificate's subject.
-peer_cert_subject_item(Cert, Type) ->
+%% Return the parts of the certificate's subject.
+peer_cert_subject_items(Cert, Type) ->
cert_info(fun(#'OTPCertificate' {
tbsCertificate = #'OTPTBSCertificate' {
subject = Subject }}) ->
@@ -72,9 +72,8 @@ peer_cert_validity(Cert) ->
cert_info(fun(#'OTPCertificate' {
tbsCertificate = #'OTPTBSCertificate' {
validity = {'Validity', Start, End} }}) ->
- lists:flatten(
- io_lib:format("~s - ~s", [format_asn1_value(Start),
- format_asn1_value(End)]))
+ rabbit_misc:format("~s - ~s", [format_asn1_value(Start),
+ format_asn1_value(End)])
end, Cert).
%%--------------------------------------------------------------------------
@@ -89,8 +88,8 @@ find_by_type(Type, {rdnSequence, RDNs}) ->
case [V || #'AttributeTypeAndValue'{type = T, value = V}
<- lists:flatten(RDNs),
T == Type] of
- [Val] -> format_asn1_value(Val);
- [] -> not_found
+ [] -> not_found;
+ L -> [format_asn1_value(V) || V <- L]
end.
%%--------------------------------------------------------------------------
@@ -150,11 +149,12 @@ escape_rdn_value([$ ], middle) ->
escape_rdn_value([C | S], middle) when C =:= $"; C =:= $+; C =:= $,; C =:= $;;
C =:= $<; C =:= $>; C =:= $\\ ->
[$\\, C | escape_rdn_value(S, middle)];
-escape_rdn_value([C | S], middle) when C < 32 ; C =:= 127 ->
- %% only U+0000 needs escaping, but for display purposes it's handy
- %% to escape all non-printable chars
- lists:flatten(io_lib:format("\\~2.16.0B", [C])) ++
- escape_rdn_value(S, middle);
+escape_rdn_value([C | S], middle) when C < 32 ; C >= 126 ->
+ %% Of ASCII characters only U+0000 needs escaping, but for display
+ %% purposes it's handy to escape all non-printable chars. All non-ASCII
+ %% characters get converted to UTF-8 sequences and then escaped. We've
+ %% already got a UTF-8 sequence here, so just escape it.
+ rabbit_misc:format("\\~2.16.0B", [C]) ++ escape_rdn_value(S, middle);
escape_rdn_value([C | S], middle) ->
[C | escape_rdn_value(S, middle)].
@@ -167,6 +167,10 @@ format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2,
Min1, Min2, S1, S2, $Z]}) ->
io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
[Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
+%% We appear to get an untagged value back for an ia5string
+%% (e.g. domainComponent).
+format_asn1_value(V) when is_list(V) ->
+ V;
format_asn1_value(V) ->
io_lib:format("~p", [V]).
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 802ea5e2e7..0965e3b36e 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_sup).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 00d46f5a56..433ed9cb59 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_tests).
@@ -59,7 +59,7 @@ all_tests() ->
passed.
maybe_run_cluster_dependent_tests() ->
- SecondaryNode = rabbit_misc:makenode("hare"),
+ SecondaryNode = rabbit_nodes:make("hare"),
case net_adm:ping(SecondaryNode) of
pong -> passed = run_cluster_dependent_tests(SecondaryNode);
@@ -71,10 +71,13 @@ maybe_run_cluster_dependent_tests() ->
run_cluster_dependent_tests(SecondaryNode) ->
SecondaryNodeS = atom_to_list(SecondaryNode),
+ cover:stop(SecondaryNode),
ok = control_action(stop_app, []),
ok = control_action(reset, []),
ok = control_action(cluster, [SecondaryNodeS]),
ok = control_action(start_app, []),
+ cover:start(SecondaryNode),
+ ok = control_action(start_app, SecondaryNode, [], []),
io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]),
passed = test_delegates_async(SecondaryNode),
@@ -859,7 +862,7 @@ test_cluster_management() ->
"invalid2@invalid"]),
ok = assert_ram_node(),
- SecondaryNode = rabbit_misc:makenode("hare"),
+ SecondaryNode = rabbit_nodes:make("hare"),
case net_adm:ping(SecondaryNode) of
pong -> passed = test_cluster_management2(SecondaryNode);
pang -> io:format("Skipping clustering tests with node ~p~n",
@@ -889,6 +892,14 @@ test_cluster_management2(SecondaryNode) ->
ok = control_action(stop_app, []),
ok = assert_ram_node(),
+ %% ram node will not start by itself
+ ok = control_action(stop_app, []),
+ ok = control_action(stop_app, SecondaryNode, [], []),
+ {error, _} = control_action(start_app, []),
+ ok = control_action(start_app, SecondaryNode, [], []),
+ ok = control_action(start_app, []),
+ ok = control_action(stop_app, []),
+
%% change cluster config while remaining in same cluster
ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]),
ok = control_action(start_app, []),
@@ -897,8 +908,7 @@ test_cluster_management2(SecondaryNode) ->
%% join non-existing cluster as a ram node
ok = control_action(force_cluster, ["invalid1@invalid",
"invalid2@invalid"]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
+ {error, _} = control_action(start_app, []),
ok = assert_ram_node(),
%% join empty cluster as a ram node (converts to disc)
@@ -953,7 +963,9 @@ test_cluster_management2(SecondaryNode) ->
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
+ cover:stop(SecondaryNode),
ok = control_action(reset, []),
+ cover:start(SecondaryNode),
%% attempt to leave cluster when no other node is alive
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
@@ -970,7 +982,15 @@ test_cluster_management2(SecondaryNode) ->
%% leave system clustered, with the secondary node as a ram node
ok = control_action(force_reset, []),
ok = control_action(start_app, []),
- ok = control_action(force_reset, SecondaryNode, [], []),
+ %% Yes, this is rather ugly. But since we're a clustered Mnesia
+ %% node and we're telling another clustered node to reset itself,
+ %% we will get disconnected half way through causing a
+ %% badrpc. This never happens in real life since rabbitmqctl is
+ %% not a clustered Mnesia node.
+ cover:stop(SecondaryNode),
+ {badrpc, nodedown} = control_action(force_reset, SecondaryNode, [], []),
+ pong = net_adm:ping(SecondaryNode),
+ cover:start(SecondaryNode),
ok = control_action(cluster, SecondaryNode, [NodeS], []),
ok = control_action(start_app, SecondaryNode, [], []),
@@ -1779,10 +1799,10 @@ test_msg_store() ->
restart_msg_store_empty(),
MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)],
{MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds),
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
{Cap, MSCState} = msg_store_client_init_capture(
?PERSISTENT_MSG_STORE, Ref),
- Ref2 = rabbit_guid:guid(),
+ Ref2 = rabbit_guid:gen(),
{Cap2, MSC2State} = msg_store_client_init_capture(
?PERSISTENT_MSG_STORE, Ref2),
%% check we don't contain any of the msgs we're about to publish
@@ -1934,7 +1954,7 @@ test_msg_store_confirms(MsgIds, Cap, MSCState) ->
passed.
test_msg_store_confirm_timer() ->
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
MsgId = msg_id_bin(1),
Self = self(),
MSCState = rabbit_msg_store:client_init(
@@ -1963,7 +1983,7 @@ msg_store_keep_busy_until_confirm(MsgIds, MSCState) ->
test_msg_store_client_delete_and_terminate() ->
restart_msg_store_empty(),
MsgIds = [msg_id_bin(M) || M <- lists:seq(1, 10)],
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
ok = msg_store_write(MsgIds, MSCState),
%% test the 'dying client' fast path for writes
@@ -1979,7 +1999,7 @@ test_queue() ->
init_test_queue() ->
TestQueue = test_queue(),
Terms = rabbit_queue_index:shutdown_terms(TestQueue),
- PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()),
+ PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:gen()),
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef),
Res = rabbit_queue_index:recover(
TestQueue, Terms, false,
@@ -2013,7 +2033,7 @@ restart_app() ->
rabbit:start().
queue_index_publish(SeqIds, Persistent, Qi) ->
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
MsgStore = case Persistent of
true -> ?PERSISTENT_MSG_STORE;
false -> ?TRANSIENT_MSG_STORE
@@ -2022,7 +2042,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
{A, B = [{_SeqId, LastMsgIdWritten} | _]} =
lists:foldl(
fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) ->
- MsgId = rabbit_guid:guid(),
+ MsgId = rabbit_guid:gen(),
QiM = rabbit_queue_index:publish(
MsgId, SeqId, #message_properties{}, Persistent, QiN),
ok = rabbit_msg_store:write(MsgId, MsgId, MSCState),
@@ -2045,7 +2065,7 @@ verify_read_with_published(_Delivered, _Persistent, _Read, _Published) ->
test_queue_index_props() ->
with_empty_test_queue(
fun(Qi0) ->
- MsgId = rabbit_guid:guid(),
+ MsgId = rabbit_guid:gen(),
Props = #message_properties{expiry=12345},
Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0),
{[{MsgId, 1, Props, _, _}], Qi2} =
@@ -2222,17 +2242,29 @@ test_amqqueue(Durable) ->
#amqqueue { durable = Durable }.
with_fresh_variable_queue(Fun) ->
- ok = empty_test_queue(),
- VQ = variable_queue_init(test_amqqueue(true), false),
- S0 = rabbit_variable_queue:status(VQ),
- assert_props(S0, [{q1, 0}, {q2, 0},
- {delta, {delta, undefined, 0, undefined}},
- {q3, 0}, {q4, 0},
- {len, 0}]),
- _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)),
+ Ref = make_ref(),
+ Me = self(),
+ %% Run in a separate process since rabbit_msg_store will send
+ %% bump_credit messages and we want to ignore them
+ spawn_link(fun() ->
+ ok = empty_test_queue(),
+ VQ = variable_queue_init(test_amqqueue(true), false),
+ S0 = rabbit_variable_queue:status(VQ),
+ assert_props(S0, [{q1, 0}, {q2, 0},
+ {delta,
+ {delta, undefined, 0, undefined}},
+ {q3, 0}, {q4, 0},
+ {len, 0}]),
+ _ = rabbit_variable_queue:delete_and_terminate(
+ shutdown, Fun(VQ)),
+ Me ! Ref
+ end),
+ receive
+ Ref -> ok
+ end,
passed.
-publish_and_confirm(QPid, Payload, Count) ->
+publish_and_confirm(Q, Payload, Count) ->
Seqs = lists:seq(1, Count),
[begin
Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
@@ -2240,7 +2272,7 @@ publish_and_confirm(QPid, Payload, Count) ->
Payload),
Delivery = #delivery{mandatory = false, immediate = false,
sender = self(), message = Msg, msg_seq_no = Seq},
- true = rabbit_amqqueue:deliver(QPid, Delivery)
+ {routed, _} = rabbit_amqqueue:deliver([Q], Delivery)
end || Seq <- Seqs],
wait_for_confirms(gb_sets:from_list(Seqs)).
@@ -2477,7 +2509,7 @@ test_queue_recover() ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
{new, #amqqueue { pid = QPid, name = QName } = Q} =
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
- publish_and_confirm(QPid, <<>>, Count),
+ publish_and_confirm(Q, <<>>, Count),
exit(QPid, kill),
MRef = erlang:monitor(process, QPid),
@@ -2507,7 +2539,7 @@ test_variable_queue_delete_msg_store_files_callback() ->
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
Payload = <<0:8388608>>, %% 1MB
Count = 30,
- publish_and_confirm(QPid, Payload, Count),
+ publish_and_confirm(Q, Payload, Count),
rabbit_amqqueue:set_ram_duration_target(QPid, 0),
diff --git a/src/rabbit_tests_event_receiver.erl b/src/rabbit_tests_event_receiver.erl
index abcbe0b6d5..72c07b51ec 100644
--- a/src/rabbit_tests_event_receiver.erl
+++ b/src/rabbit_tests_event_receiver.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_tests_event_receiver).
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index 58079ccf47..3a5b96de4f 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_trace).
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 2db960acc9..732c29b6b6 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_types).
@@ -28,12 +28,9 @@
binding/0, binding_source/0, binding_destination/0,
amqqueue/0, exchange/0,
connection/0, protocol/0, user/0, internal_user/0,
- username/0, password/0, password_hash/0, ok/1, error/1,
- ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0,
- connection_exit/0]).
-
--type(channel_exit() :: no_return()).
--type(connection_exit() :: no_return()).
+ username/0, password/0, password_hash/0,
+ ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0,
+ channel_exit/0, connection_exit/0, mfargs/0]).
-type(maybe(T) :: T | 'none').
-type(vhost() :: binary()).
@@ -156,4 +153,9 @@
-type(ok_or_error2(A, B) :: ok(A) | error(B)).
-type(ok_pid_or_error() :: ok_or_error2(pid(), any())).
+-type(channel_exit() :: no_return()).
+-type(connection_exit() :: no_return()).
+
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-endif. % use_specs
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 717d94a802..80f50b38b3 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_upgrade).
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index e0ca8cbb72..9f2535bd04 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_upgrade_functions).
@@ -35,6 +35,7 @@
-rabbit_upgrade({gm, mnesia, []}).
-rabbit_upgrade({exchange_scratch, mnesia, [trace_exchanges]}).
-rabbit_upgrade({mirrored_supervisor, mnesia, []}).
+-rabbit_upgrade({topic_trie_node, mnesia, []}).
%% -------------------------------------------------------------------
@@ -54,6 +55,7 @@
-spec(gm/0 :: () -> 'ok').
-spec(exchange_scratch/0 :: () -> 'ok').
-spec(mirrored_supervisor/0 :: () -> 'ok').
+-spec(topic_trie_node/0 :: () -> 'ok').
-endif.
@@ -177,6 +179,12 @@ mirrored_supervisor() ->
[{record_name, mirrored_sup_childspec},
{attributes, [key, mirroring_pid, childspec]}]).
+topic_trie_node() ->
+ create(rabbit_topic_trie_node,
+ [{record_name, topic_trie_node},
+ {attributes, [trie_node, edge_count, binding_count]},
+ {type, ordered_set}]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 63a0927f77..52eb168a42 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_variable_queue).
@@ -434,7 +434,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, Terms1} =
case proplists:get_value(persistent_ref, Terms) of
- undefined -> {rabbit_guid:guid(), []};
+ undefined -> {rabbit_guid:gen(), []};
PRef1 -> {PRef1, Terms}
end,
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
@@ -860,7 +860,8 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
Res.
msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
- msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun, Callback).
+ msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
+ Callback).
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
@@ -870,17 +871,23 @@ msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:write(MsgId, Msg, MSCState1) end).
+ fun (MSCState1) ->
+ rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
+ end).
msg_store_read(MSCState, IsPersistent, MsgId) ->
with_msg_store_state(
MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:read(MsgId, MSCState1) end).
+ fun (MSCState1) ->
+ rabbit_msg_store:read(MsgId, MSCState1)
+ end).
msg_store_remove(MSCState, IsPersistent, MsgIds) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end).
+ fun (MCSState1) ->
+ rabbit_msg_store:remove(MsgIds, MCSState1)
+ end).
msg_store_close_fds(MSCState, IsPersistent) ->
with_msg_store_state(
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
index f6bcbb7fd5..7545d81362 100644
--- a/src/rabbit_version.erl
+++ b/src/rabbit_version.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_version).
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 38bb76b03b..5548ef6d05 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_vhost).
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 091b50e4c6..dc74b2f5b0 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_writer).
@@ -129,6 +129,9 @@ handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
ok = internal_send_command_async(MethodRecord, Content, State),
rabbit_amqqueue:notify_sent(QPid, ChPid),
State;
+handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
+ rabbit_amqqueue:notify_sent_queue_down(QPid),
+ State;
handle_message({inet_reply, _, ok}, State) ->
State;
handle_message({inet_reply, _, Status}, _State) ->
@@ -169,12 +172,10 @@ call(Pid, Msg) ->
%%---------------------------------------------------------------------------
assemble_frame(Channel, MethodRecord, Protocol) ->
- ?LOGMESSAGE(out, Channel, MethodRecord, none),
rabbit_binary_generator:build_simple_method_frame(
Channel, MethodRecord, Protocol).
assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
- ?LOGMESSAGE(out, Channel, MethodRecord, Content),
MethodName = rabbit_misc:method_record_type(MethodRecord),
true = Protocol:method_has_content(MethodName), % assertion
MethodFrame = rabbit_binary_generator:build_simple_method_frame(
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index f75da87221..a2f4fae980 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -41,7 +41,7 @@
%% 5) normal, and {shutdown, _} exit reasons are all treated the same
%% (i.e. are regarded as normal exits)
%%
-%% All modifications are (C) 2010-2011 VMware, Inc.
+%% All modifications are (C) 2010-2012 VMware, Inc.
%%
%% %CopyrightBegin%
%%
@@ -717,8 +717,8 @@ do_terminate(Child, SupName) when Child#child.pid =/= undefined ->
ok;
{error, normal} ->
case Child#child.restart_type of
- permanent -> ReportError(normal);
- {permanent, _Delay} -> ReportError(normal);
+ permanent -> ReportError(normal, Child);
+ {permanent, _Delay} -> ReportError(normal, Child);
_ -> ok
end;
{error, OtherReason} ->
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 0d50683db7..43a6bc994b 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_acceptor).
@@ -54,28 +54,9 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
{ok, Mod} = inet_db:lookup_socket(LSock),
inet_db:register_socket(Sock, Mod),
- try
- %% report
- {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
- {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end),
- error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
- [rabbit_misc:ntoab(Address), Port,
- rabbit_misc:ntoab(PeerAddress), PeerPort]),
- %% In the event that somebody floods us with connections we can spew
- %% the above message at error_logger faster than it can keep up.
- %% So error_logger's mailbox grows unbounded until we eat all the
- %% memory available and crash. So here's a meaningless synchronous call
- %% to the underlying gen_event mechanism - when it returns the mailbox
- %% is drained.
- gen_event:which_handlers(error_logger),
- %% handle
- file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
- ok = file_handle_cache:obtain()
- catch {inet_error, Reason} ->
- gen_tcp:close(Sock),
- error_logger:error_msg("unable to accept TCP connection: ~p~n",
- [Reason])
- end,
+ %% handle
+ file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
+ ok = file_handle_cache:obtain(),
%% accept more
accept(State);
@@ -86,6 +67,16 @@ handle_info({inet_async, LSock, Ref, {error, closed}},
%% know this will fail.
{stop, normal, State};
+handle_info({inet_async, LSock, Ref, {error, Reason}},
+ State=#state{sock=LSock, ref=Ref}) ->
+ {AddressS, Port} = case inet:sockname(LSock) of
+ {ok, {A, P}} -> {rabbit_misc:ntoab(A), P};
+ {error, _} -> {"unknown", unknown}
+ end,
+ error_logger:error_msg("failed to accept TCP connection on ~s:~p: ~p~n",
+ [AddressS, Port, Reason]),
+ accept(State);
+
handle_info(_Info, State) ->
{noreply, State}.
@@ -97,8 +88,6 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
-inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
-
accept(State = #state{sock=LSock}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} -> {noreply, State#state{ref=Ref}};
diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl
index 4c835598e0..d8844441d2 100644
--- a/src/tcp_acceptor_sup.erl
+++ b/src/tcp_acceptor_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_acceptor_sup).
@@ -25,7 +25,11 @@
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()).
+
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
+-spec(start_link/2 :: (atom(), mfargs()) -> rabbit_types:ok_pid_or_error()).
+
-endif.
%%----------------------------------------------------------------------------
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index ad2a0d02d0..fb01c792f4 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_listener).
@@ -28,9 +28,14 @@
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/8 ::
- (gen_tcp:ip_address(), integer(), rabbit_types:infos(), integer(),
- atom(), mfa(), mfa(), string()) -> rabbit_types:ok_pid_or_error()).
+ (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
+ integer(), atom(), mfargs(), mfargs(), string()) ->
+ rabbit_types:ok_pid_or_error()).
+
-endif.
%%--------------------------------------------------------------------
@@ -67,8 +72,9 @@ init({IPAddress, Port, SocketOpts,
label = Label}};
{error, Reason} ->
error_logger:error_msg(
- "failed to start ~s on ~s:~p - ~p~n",
- [Label, rabbit_misc:ntoab(IPAddress), Port, Reason]),
+ "failed to start ~s on ~s:~p - ~p (~s)~n",
+ [Label, rabbit_misc:ntoab(IPAddress), Port,
+ Reason, inet:format_error(Reason)]),
{stop, {cannot_listen, IPAddress, Port, Reason}}
end.
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index 5bff5c2701..9ee921b423 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_listener_sup).
@@ -26,12 +26,16 @@
-ifdef(use_specs).
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/7 ::
- (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(),
- mfa(), string()) -> rabbit_types:ok_pid_or_error()).
+ (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
+ mfargs(), mfargs(), mfargs(), string()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(start_link/8 ::
- (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(),
- mfa(), integer(), string()) -> rabbit_types:ok_pid_or_error()).
+ (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
+ mfargs(), mfargs(), mfargs(), integer(), string()) ->
+ rabbit_types:ok_pid_or_error()).
-endif.
diff --git a/src/test_sup.erl b/src/test_sup.erl
index 5feb146f64..7f4b5049fe 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(test_sup).
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 8973a4f750..fca55f02f1 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% In practice Erlang shouldn't be allowed to grow to more than a half
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 456ff39f47..c9ecccd6b6 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(worker_pool).
@@ -37,10 +37,11 @@
-ifdef(use_specs).
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
--spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).
--spec(submit_async/1 ::
- (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
+-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
+-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
-spec(idle/1 :: (any()) -> 'ok').
-endif.
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
index d37c3a0fd6..ff356366ac 100644
--- a/src/worker_pool_sup.erl
+++ b/src/worker_pool_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(worker_pool_sup).
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 78ab4df3ea..1ddcebb23d 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(worker_pool_worker).
@@ -29,12 +29,12 @@
-ifdef(use_specs).
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}).
--spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A).
--spec(submit_async/2 ::
- (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
--spec(run/1 :: (fun (() -> A)) -> A;
- ({atom(), atom(), [any()]}) -> any()).
+-spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A).
+-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
+-spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()).
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-endif.